上文:
中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析各自一下消息处理过程:
前文可以看到在
1.在单机情况下NettyServerCnxnFactory中启动ZookeeperServer来处理消息:
public synchronized void startup() {  if (sessionTracker == null) {   createSessionTracker();  }  startSessionTracker();  setupRequestProcessors();  registerJMX();  state = State.RUNNING;  notifyAll(); }    消息处理器的调用如下:
protected void setupRequestProcessors() {     RequestProcessor finalProcessor = new FinalRequestProcessor(this);     RequestProcessor syncProcessor = new SyncRequestProcessor(this,      finalProcessor);     ((SyncRequestProcessor)syncProcessor).start();     firstProcessor = new PrepRequestProcessor(this, syncProcessor);     ((PrepRequestProcessor)firstProcessor).start(); }    我们看到启动两个消息处理器来处理请求:第一个同步消息处理器预消息服务器,最后一个同步请求处理器和异步请求处理器。
1.1 第一个消息服务器处理器预消息服务器PrepRequestProcessor
@Override public void run() {  try {   while (true) {    Request request = submittedRequests.take();    long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;    if (request.type == OpCode.ping) {     traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;    }    if (LOG.isTraceEnabled()) {     ZooTrace.logRequest(LOG, traceMask, 'P', request, "");    }    if (Request.requestOfDeath == request) {     break;    }    pRequest(request);   }  } catch (RequestProcessorException e) {   if (e.getCause() instanceof XidRolloverException) {    LOG.info(e.getCause().getMessage());   }   handleException(this.getName(), e);  } catch (Exception e) {   handleException(this.getName(), e);  }  LOG.info("PrepRequestProcessor exited loop!"); }    可以看到,while(true)是一个一直循环处理的过程,其中红色的部分为处理的主体。
/**  * This method will be called inside the ProcessRequestThread, which is a  * singleton, so there will be a single thread calling this code.  *  * @param request  */ protected void pRequest(Request request) throws RequestProcessorException {  // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +  // request.type + " id = 0x" + Long.toHexString(request.sessionId));  request.setHdr(null);  request.setTxn(null);  try {   switch (request.type) {   case OpCode.createContainer:   case OpCode.create:   case OpCode.create2:    CreateRequest create2Request = new CreateRequest();    pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);    break;   case OpCode.deleteContainer:   case OpCode.delete:    DeleteRequest deleteRequest = new DeleteRequest();    pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);    break;   case OpCode.setData:    SetDataRequest setDataRequest = new SetDataRequest();        pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);    break;   case OpCode.reconfig:    ReconfigRequest reconfigRequest = new ReconfigRequest();    ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);    pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);    break;   case OpCode.setACL:    SetACLRequest setAclRequest = new SetACLRequest();        pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);    break;   case OpCode.check:    CheckVersionRequest checkRequest = new CheckVersionRequest();         pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);    break;   case OpCode.multi:    MultiTransactionRecord multiRequest = new MultiTransactionRecord();    try {     ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);    } catch(IOException e) {     request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),       Time.currentWallTime(), OpCode.multi));     throw e;    }    List<Txn> txns = new ArrayList<Txn>();    //Each op in a multi-op must have the same zxid!    long zxid = zks.getNextZxid();    KeeperException ke = null;    //Store off current pending change records in case we need to rollback    Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);    for(Op op: multiRequest) {     Record subrequest = op.toRequestRecord();     int type;     Record txn;     /* If we've already failed one of the ops, don't bother      * trying the rest as we know it's going to fail and it      * would be confusing in the logfiles.      */     if (ke != null) {      type = OpCode.error;      txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());     }     /* Prep the request and convert to a Txn */     else {      try {       pRequest2Txn(op.getType(), zxid, request, subrequest, false);       type = request.getHdr().getType();       txn = request.getTxn();      } catch (KeeperException e) {       ke = e;       type = OpCode.error;       txn = new ErrorTxn(e.code().intValue());       LOG.info("Got user-level KeeperException when processing "         + request.toString() + " aborting remaining multi ops."         + " Error Path:" + e.getPath()         + " Error:" + e.getMessage());       request.setException(e);       /* Rollback change records from failed multi-op */       rollbackPendingChanges(zxid, pendingChanges);      }     }     //FIXME: I don't want to have to serialize it here and then     //    immediately deserialize in next processor. But I'm     //    not sure how else to get the txn stored into our list.     ByteArrayOutputStream baos = new ByteArrayOutputStream();     BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);     txn.serialize(boa, "request") ;     ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());     txns.add(new Txn(type, bb.array()));    }    request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,      Time.currentWallTime(), request.type));    request.setTxn(new MultiTxn(txns));    break;   //create/close session don't require request record   case OpCode.createSession:   case OpCode.closeSession:    if (!request.isLocalSession()) {     pRequest2Txn(request.type, zks.getNextZxid(), request,         null, true);    }    break;   //All the rest don't need to create a Txn - just verify session   case OpCode.sync:   case OpCode.exists:   case OpCode.getData:   case OpCode.getACL:   case OpCode.getChildren:   case OpCode.getChildren2:   case OpCode.ping:   case OpCode.setWatches:   case OpCode.checkWatches:   case OpCode.removeWatches:    zks.sessionTracker.checkSession(request.sessionId,      request.getOwner());    break;   default:    LOG.warn("unknown type " + request.type);    break;   }  } catch (KeeperException e) {   if (request.getHdr() != null) {    request.getHdr().setType(OpCode.error);    request.setTxn(new ErrorTxn(e.code().intValue()));   }   LOG.info("Got user-level KeeperException when processing "     + request.toString()     + " Error Path:" + e.getPath()     + " Error:" + e.getMessage());   request.setException(e);  } catch (Exception e) {   // log at error level as we are returning a marshalling   // error to the user   LOG.error("Failed to process " + request, e);   StringBuilder sb = new StringBuilder();   ByteBuffer bb = request.request;   if(bb != null){    bb.rewind();    while (bb.hasRemaining()) {     sb.append(Integer.toHexString(bb.get() & 0xff));    }   } else {    sb.append("request buffer is null");   }   LOG.error("Dumping request buffer: 0x" + sb.toString());   if (request.getHdr() != null) {    request.getHdr().setType(OpCode.error);    request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));   }  }  request.zxid = zks.getZxid();  nextProcessor.processRequest(request); }    排除异常的逻辑,该方法是处理不同类型的request,根据type选择一个处理分支,ProcessRequestThread内部调用该方法,它是单例的,因此只有一个单线程调用此代码。以create请求为例(红色部分),了解工作机制:
CreateRequest createRequest = (CreateRequest)record; if (deserialize) {  ByteBufferInputStream.byteBuffer2Record(request.request, createRequest); } CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); validateCreateRequest(createMode, request); String path = createRequest.getPath(); String parentPath = validatePathForCreate(path, request.sessionId); List<ACL> listACL = fixupACL(path, request.authInfo, createRequest.getAcl()); ChangeRecord parentRecord = getRecordForPath(parentPath); checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo); int parentCVersion = parentRecord.stat.getCversion(); if (createMode.isSequential()) {  path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion); } validatePath(path, request.sessionId); try {  if (getRecordForPath(path) != null) {   throw new KeeperException.NodeExistsException(path);  } } catch (KeeperException.NoNodeException e) {  // ignore this one } boolean ephemeralParent = (parentRecord.stat.getEphemeralOwner() != 0) &&   (parentRecord.stat.getEphemeralOwner() != DataTree.CONTAINER_EPHEMERAL_OWNER); if (ephemeralParent) {  throw new KeeperException.NoChildrenForEphemeralsException(path); } int newCversion = parentRecord.stat.getCversion()+1; if (type == OpCode.createContainer) {  request.setTxn(new CreateContainerTxn(path, createRequest.getData(), listACL, newCversion)); } else {  request.setTxn(new CreateTxn(path, createRequest.getData(), listACL, createMode.isEphemeral(),    newCversion)); } StatPersisted s = new StatPersisted(); if (createMode.isEphemeral()) {  s.setEphemeralOwner(request.sessionId); } parentRecord = parentRecord.duplicate(request.getHdr().getZxid()); parentRecord.childCount++; parentRecord.stat.setCversion(newCversion); addChangeRecord(parentRecord); addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL)); break;    调用方法,处理变化:
private void addChangeRecord(ChangeRecord c) {         synchronized (zks.outstandingChanges) {             zks.outstandingChanges.add(c);             zks.outstandingChangesForPath.put(c.path, c);         }     }   继续向下
private void addChangeRecord(ChangeRecord c) {         synchronized (zks.outstandingChanges) {             zks.outstandingChanges.add(c);             zks.outstandingChangesForPath.put(c.path, c);         }     }   其中:outstandingChanges 是一组ChangeRecord,outstandingChangesForPath是map的ChangeRecord,如下定义:
final List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
// this data structure must be accessed under the outstandingChanges lock
final HashMap<String, ChangeRecord> outstandingChangesForPath =
new HashMap<String, ChangeRecord>();
ChangeRecord是一个数据结构,方便PrepRP和FinalRp共享信息。
ChangeRecord(long zxid, String path, StatPersisted stat, int childCount,  List<ACL> acl) {     this.zxid = zxid;     this.path = path;     this.stat = stat;     this.childCount = childCount;     this.acl = acl; }    1.2 先看一下同步请求处理器FinalRequestProcessor,这个请求处理器实际上应用到一个请求的所有事务,针对任何查询提供服务。它通常处于请求处理的最后(不会有下一个消息处理器),故此得名。 它是如何处理请求呢?
public void processRequest(Request request) {  if (LOG.isDebugEnabled()) {   LOG.debug("Processing request:: " + request);  }  // request.addRQRec(">final");  long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;  if (request.type == OpCode.ping) {   traceMask = ZooTrace.SERVER_PING_TRACE_MASK;  }  if (LOG.isTraceEnabled()) {   ZooTrace.logRequest(LOG, traceMask, 'E', request, "");  }  ProcessTxnResult rc = null;  synchronized (zks.outstandingChanges) {   // Need to process local session requests   rc = zks.processTxn(request);   // request.hdr is set for write requests, which are the only ones   // that add to outstandingChanges.   if (request.getHdr() != null) {    TxnHeader hdr = request.getHdr();    Record txn = request.getTxn();    long zxid = hdr.getZxid();    while (!zks.outstandingChanges.isEmpty()        && zks.outstandingChanges.get(0).zxid <= zxid) {     ChangeRecord cr = zks.outstandingChanges.remove(0);     if (cr.zxid < zxid) {      LOG.warn("Zxid outstanding " + cr.zxid         + " is less than current " + zxid);     }     if (zks.outstandingChangesForPath.get(cr.path) == cr) {      zks.outstandingChangesForPath.remove(cr.path);     }    }   }   // do not add non quorum packets to the queue.   if (request.isQuorum()) {    zks.getZKDatabase().addCommittedProposal(request);   }  }  // ZOOKEEPER-558:  // In some cases the server does not close the connection (e.g., closeconn buffer  // was not being queued — ZOOKEEPER-558) properly. This happens, for example,  // when the client closes the connection. The server should still close the session, though.  // Calling closeSession() after losing the cnxn, results in the client close session response being dropped.  if (request.type == OpCode.closeSession && connClosedByClient(request)) {   // We need to check if we can close the session id.   // Sometimes the corresponding ServerCnxnFactory could be null because   // we are just playing diffs from the leader.   if (closeSession(zks.serverCnxnFactory, request.sessionId) ||     closeSession(zks.secureServerCnxnFactory, request.sessionId)) {    return;   }  }  if (request.cnxn == null) {   return;  }  ServerCnxn cnxn = request.cnxn;  String lastOp = "NA";  zks.decInProcess();  Code err = Code.OK;  Record rsp = null;  try {   if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {    /*     * When local session upgrading is disabled, leader will     * reject the ephemeral node creation due to session expire.     * However, if this is the follower that issue the request,     * it will have the correct error code, so we should use that     * and report to user     */    if (request.getException() != null) {     throw request.getException();    } else {     throw KeeperException.create(KeeperException.Code       .get(((ErrorTxn) request.getTxn()).getErr()));    }   }   KeeperException ke = request.getException();   if (ke != null && request.type != OpCode.multi) {    throw ke;   }   if (LOG.isDebugEnabled()) {    LOG.debug("{}",request);   }   switch (request.type) {   case OpCode.ping: {    zks.serverStats().updateLatency(request.createTime);    lastOp = "PING";    cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,      request.createTime, Time.currentElapsedTime());    cnxn.sendResponse(new ReplyHeader(-2,      zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");    return;   }   case OpCode.createSession: {    zks.serverStats().updateLatency(request.createTime);    lastOp = "SESS";    cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,      request.createTime, Time.currentElapsedTime());    zks.finishSessionInit(request.cnxn, true);    return;   }   case OpCode.multi: {    lastOp = "MULT";    rsp = new MultiResponse() ;    for (ProcessTxnResult subTxnResult : rc.multiResult) {     OpResult subResult ;     switch (subTxnResult.type) {      case OpCode.check:       subResult = new CheckResult();       break;      case OpCode.create:       subResult = new CreateResult(subTxnResult.path);       break;      case OpCode.create2:      case OpCode.createContainer:       subResult = new CreateResult(subTxnResult.path, subTxnResult.stat);       break;      case OpCode.delete:      case OpCode.deleteContainer:       subResult = new DeleteResult();       break;      case OpCode.setData:       subResult = new SetDataResult(subTxnResult.stat);       break;      case OpCode.error:       subResult = new ErrorResult(subTxnResult.err) ;       break;      default:       throw new IOException("Invalid type of op");     }     ((MultiResponse)rsp).add(subResult);    }    break;   }   case OpCode.create: {    lastOp = "CREA";    rsp = new CreateResponse(rc.path);    err = Code.get(rc.err);    break;   }   case OpCode.create2:   case OpCode.createContainer: {    lastOp = "CREA";    rsp = new Create2Response(rc.path, rc.stat);    err = Code.get(rc.err);    break;   }   case OpCode.delete:   case OpCode.deleteContainer: {    lastOp = "DELE";    err = Code.get(rc.err);    break;   }   case OpCode.setData: {    lastOp = "SETD";    rsp = new SetDataResponse(rc.stat);    err = Code.get(rc.err);    break;   }        case OpCode.reconfig: {    lastOp = "RECO";          rsp = new GetDataResponse(((QuorumZooKeeperServer)zks).self.getQuorumVerifier().toString().getBytes(), rc.stat);    err = Code.get(rc.err);    break;   }   case OpCode.setACL: {    lastOp = "SETA";    rsp = new SetACLResponse(rc.stat);    err = Code.get(rc.err);    break;   }   case OpCode.closeSession: {    lastOp = "CLOS";    err = Code.get(rc.err);    break;   }   case OpCode.sync: {    lastOp = "SYNC";    SyncRequest syncRequest = new SyncRequest();    ByteBufferInputStream.byteBuffer2Record(request.request,      syncRequest);    rsp = new SyncResponse(syncRequest.getPath());    break;   }   case OpCode.check: {    lastOp = "CHEC";    rsp = new SetDataResponse(rc.stat);    err = Code.get(rc.err);    break;   }   case OpCode.exists: {    lastOp = "EXIS";    // TODO we need to figure out the security requirement for this!    ExistsRequest existsRequest = new ExistsRequest();    ByteBufferInputStream.byteBuffer2Record(request.request,      existsRequest);    String path = existsRequest.getPath();    if (path.indexOf('/0') != -1) {     throw new KeeperException.BadArgumentsException();    }    Stat stat = zks.getZKDatabase().statNode(path, existsRequest      .getWatch() ? cnxn : null);    rsp = new ExistsResponse(stat);    break;   }   case OpCode.getData: {    lastOp = "GETD";    GetDataRequest getDataRequest = new GetDataRequest();    ByteBufferInputStream.byteBuffer2Record(request.request,      getDataRequest);    DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());    if (n == null) {     throw new KeeperException.NoNodeException();    }    Long aclL;    synchronized(n) {     aclL = n.acl;    }    PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclL),      ZooDefs.Perms.READ,      request.authInfo);    Stat stat = new Stat();    byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,      getDataRequest.getWatch() ? cnxn : null);    rsp = new GetDataResponse(b, stat);    break;   }   case OpCode.setWatches: {    lastOp = "SETW";    SetWatches setWatches = new SetWatches();    // XXX We really should NOT need this!!!!    request.request.rewind();    ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);    long relativeZxid = setWatches.getRelativeZxid();    zks.getZKDatabase().setWatches(relativeZxid,      setWatches.getDataWatches(),      setWatches.getExistWatches(),      setWatches.getChildWatches(), cnxn);    break;   }   case OpCode.getACL: {    lastOp = "GETA";    GetACLRequest getACLRequest = new GetACLRequest();    ByteBufferInputStream.byteBuffer2Record(request.request,      getACLRequest);    Stat stat = new Stat();    List<ACL> acl =     zks.getZKDatabase().getACL(getACLRequest.getPath(), stat);    rsp = new GetACLResponse(acl, stat);    break;   }   case OpCode.getChildren: {    lastOp = "GETC";    GetChildrenRequest getChildrenRequest = new GetChildrenRequest();    ByteBufferInputStream.byteBuffer2Record(request.request,      getChildrenRequest);    DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());    if (n == null) {     throw new KeeperException.NoNodeException();    }    Long aclG;    synchronized(n) {     aclG = n.acl;    }    PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),      ZooDefs.Perms.READ,      request.authInfo);    List<String> children = zks.getZKDatabase().getChildren(      getChildrenRequest.getPath(), null, getChildrenRequest        .getWatch() ? cnxn : null);    rsp = new GetChildrenResponse(children);    break;   }   case OpCode.getChildren2: {    lastOp = "GETC";    GetChildren2Request getChildren2Request = new GetChildren2Request();    ByteBufferInputStream.byteBuffer2Record(request.request,      getChildren2Request);    Stat stat = new Stat();    DataNode n = zks.getZKDatabase().getNode(getChildren2Request.getPath());    if (n == null) {     throw new KeeperException.NoNodeException();    }    Long aclG;    synchronized(n) {     aclG = n.acl;    }    PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().convertLong(aclG),      ZooDefs.Perms.READ,      request.authInfo);    List<String> children = zks.getZKDatabase().getChildren(      getChildren2Request.getPath(), stat, getChildren2Request        .getWatch() ? cnxn : null);    rsp = new GetChildren2Response(children, stat);    break;   }   case OpCode.checkWatches: {    lastOp = "CHKW";    CheckWatchesRequest checkWatches = new CheckWatchesRequest();    ByteBufferInputStream.byteBuffer2Record(request.request,      checkWatches);    WatcherType type = WatcherType.fromInt(checkWatches.getType());    boolean containsWatcher = zks.getZKDatabase().containsWatcher(      checkWatches.getPath(), type, cnxn);    if (!containsWatcher) {     String msg = String.format(Locale.ENGLISH, "%s (type: %s)",       new Object[] { checkWatches.getPath(), type });     throw new KeeperException.NoWatcherException(msg);    }    break;   }   case OpCode.removeWatches: {    lastOp = "REMW";    RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();    ByteBufferInputStream.byteBuffer2Record(request.request,      removeWatches);    WatcherType type = WatcherType.fromInt(removeWatches.getType());    boolean removed = zks.getZKDatabase().removeWatch(      removeWatches.getPath(), type, cnxn);    if (!removed) {     String msg = String.format(Locale.ENGLISH, "%s (type: %s)",       new Object[] { removeWatches.getPath(), type });     throw new KeeperException.NoWatcherException(msg);    }    break;   }   }  } catch (SessionMovedException e) {   // session moved is a connection level error, we need to tear   // down the connection otw ZOOKEEPER-710 might happen   // ie client on slow follower starts to renew session, fails   // before this completes, then tries the fast follower (leader)   // and is successful, however the initial renew is then   // successfully fwd/processed by the leader and as a result   // the client and leader disagree on where the client is most   // recently attached (and therefore invalid SESSION MOVED generated)   cnxn.sendCloseSession();   return;  } catch (KeeperException e) {   err = e.code();  } catch (Exception e) {   // log at error level as we are returning a marshalling   // error to the user   LOG.error("Failed to process " + request, e);   StringBuilder sb = new StringBuilder();   ByteBuffer bb = request.request;   bb.rewind();   while (bb.hasRemaining()) {    sb.append(Integer.toHexString(bb.get() & 0xff));   }   LOG.error("Dumping request buffer: 0x" + sb.toString());   err = Code.MARSHALLINGERROR;  }  long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();  ReplyHeader hdr =   new ReplyHeader(request.cxid, lastZxid, err.intValue());  zks.serverStats().updateLatency(request.createTime);  cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,     request.createTime, Time.currentElapsedTime());  try {   cnxn.sendResponse(hdr, rsp, "response");   if (request.type == OpCode.closeSession) {    cnxn.sendCloseSession();   }  } catch (IOException e) {   LOG.error("FIXMSG",e);  } }    第一步,根据共享的outstandingChanges,
先处理事务后处理session:
private ProcessTxnResult processTxn(Request request, TxnHeader hdr,          Record txn) {  ProcessTxnResult rc;  int opCode = request != null ? request.type : hdr.getType();  long sessionId = request != null ? request.sessionId : hdr.getClientId();  if (hdr != null) {   rc = getZKDatabase().processTxn(hdr, txn);  } else {   rc = new ProcessTxnResult();  }  if (opCode == OpCode.createSession) {   if (hdr != null && txn instanceof CreateSessionTxn) {    CreateSessionTxn cst = (CreateSessionTxn) txn;    sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());   } else if (request != null && request.isLocalSession()) {    request.request.rewind();    int timeout = request.request.getInt();    request.request.rewind();    sessionTracker.addSession(request.sessionId, timeout);   } else {    LOG.warn("*****>>>>> Got "      + txn.getClass() + " "      + txn.toString());   }  } else if (opCode == OpCode.closeSession) {   sessionTracker.removeSession(sessionId);  }  return rc; }    处理事务, 本地和数据库的不同分支, DataTree创建节点
CreateTxn createTxn = (CreateTxn) txn; rc.path = createTxn.getPath(); createNode( createTxn.getPath(), createTxn.getData(), createTxn.getAcl(), createTxn.getEphemeral() ? header.getClientId() : 0, createTxn.getParentCVersion(), header.getZxid(), header.getTime(), null); break;
新增一个节点的逻辑是:
/**  * Add a new node to the DataTree.  * @param path  *      Path for the new node.  * @param data  *   Data to store in the node.  * @param acl  *   Node acls  * @param ephemeralOwner  *   the session id that owns this node. -1 indicates this is not  *   an ephemeral node.  * @param zxid  *   Transaction ID  * @param time  * @param outputStat  *      A Stat object to store Stat output results into.  * @throws NodeExistsException   * @throws NoNodeException   * @throws KeeperException  */ public void createNode(final String path, byte data[], List<ACL> acl,   long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat)   throws KeeperException.NoNodeException,   KeeperException.NodeExistsException {  int lastSlash = path.lastIndexOf('/');  String parentName = path.substring(0, lastSlash);  String childName = path.substring(lastSlash + 1);  StatPersisted stat = new StatPersisted();  stat.setCtime(time);  stat.setMtime(time);  stat.setCzxid(zxid);  stat.setMzxid(zxid);  stat.setPzxid(zxid);  stat.setVersion(0);  stat.setAversion(0);  stat.setEphemeralOwner(ephemeralOwner);  DataNode parent = nodes.get(parentName);  if (parent == null) {   throw new KeeperException.NoNodeException();  }  synchronized (parent) {   Set<String> children = parent.getChildren();   if (children != null && children.contains(childName)) {    throw new KeeperException.NodeExistsException();   }   if (parentCVersion == -1) {    parentCVersion = parent.stat.getCversion();    parentCVersion++;   }   parent.stat.setCversion(parentCVersion);   parent.stat.setPzxid(zxid);   Long longval = convertAcls(acl);   DataNode child = new DataNode(data, longval, stat);   parent.addChild(childName);   nodes.put(path, child);   if (ephemeralOwner == CONTAINER_EPHEMERAL_OWNER) {    containers.add(path);   } else if (ephemeralOwner != 0) {    HashSet<String> list = ephemerals.get(ephemeralOwner);    if (list == null) {     list = new HashSet<String>();     ephemerals.put(ephemeralOwner, list);    }    synchronized (list) {     list.add(path);    }   }   if (outputStat != null) {    child.copyStat(outputStat);   }  }  // now check if its one of the zookeeper node child  if (parentName.startsWith(quotaZookeeper)) {   // now check if its the limit node   if (Quotas.limitNode.equals(childName)) {    // this is the limit node    // get the parent and add it to the trie    pTrie.addPath(parentName.substring(quotaZookeeper.length()));   }   if (Quotas.statNode.equals(childName)) {    updateQuotaForPath(parentName      .substring(quotaZookeeper.length()));   }  }  // also check to update the quotas for this node  String lastPrefix = getMaxPrefixWithQuota(path);  if(lastPrefix != null) {   // ok we have some match and need to update   updateCount(lastPrefix, 1);   updateBytes(lastPrefix, data == null ? 0 : data.length);  }  dataWatches.triggerWatch(path, Event.EventType.NodeCreated);  childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,    Event.EventType.NodeChildrenChanged); }    最后的逻辑是触发创建节点和子节点改变事件。
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {  WatchedEvent e = new WatchedEvent(type,    KeeperState.SyncConnected, path);  HashSet<Watcher> watchers;  synchronized (this) {   watchers = watchTable.remove(path);   if (watchers == null || watchers.isEmpty()) {    if (LOG.isTraceEnabled()) {     ZooTrace.logTraceMessage(LOG,       ZooTrace.EVENT_DELIVERY_TRACE_MASK,       "No watchers for " + path);    }    return null;   }   for (Watcher w : watchers) {    HashSet<String> paths = watch2Paths.get(w);    if (paths != null) {     paths.remove(path);    }   }  }  for (Watcher w : watchers) {   if (supress != null && supress.contains(w)) {    continue;   }   w.process(e);  }  return watchers; }    WatcherManager调用定义的watcher进行事件处理。
1.3. 再看异步消息处理器SyncRequestProcessor
@Override public void run() {  try {   int logCount = 0;   // we do this in an attempt to ensure that not all of the servers   // in the ensemble take a snapshot at the same time   int randRoll = r.nextInt(snapCount/2);   while (true) {    Request si = null;    if (toFlush.isEmpty()) {     si = queuedRequests.take();    } else {     si = queuedRequests.poll();     if (si == null) {      flush(toFlush);      continue;     }    }    if (si == requestOfDeath) {     break;    }    if (si != null) {     // track the number of records written to the log     if (zks.getZKDatabase().append(si)) {      logCount++;      if (logCount > (snapCount / 2 + randRoll)) {       randRoll = r.nextInt(snapCount/2);       // roll the log       zks.getZKDatabase().rollLog();       // take a snapshot       if (snapInProcess != null && snapInProcess.isAlive()) {        LOG.warn("Too busy to snap, skipping");       } else {        snapInProcess = new ZooKeeperThread("Snapshot Thread") {          public void run() {           try {            zks.takeSnapshot();           } catch(Exception e) {            LOG.warn("Unexpected exception", e);           }          }         };        snapInProcess.start();       }       logCount = 0;      }     } else if (toFlush.isEmpty()) {      // optimization for read heavy workloads      // iff this is a read, and there are no pending      // flushes (writes), then just pass this to the next      // processor      if (nextProcessor != null) {       nextProcessor.processRequest(si);       if (nextProcessor instanceof Flushable) {        ((Flushable)nextProcessor).flush();       }      }      continue;     }     toFlush.add(si);     if (toFlush.size() > 1000) {      flush(toFlush);     }    }   }  } catch (Throwable t) {   handleException(this.getName(), t);  } finally{   running = false;  }  LOG.info("SyncRequestProcessor exited!"); }    异步处理日志和快照,启动ZooKeeperThread线程来生成快照。
public void takeSnapshot(){  try {   txnLogFactory.save(zkDb.getDataTree(), zkDb.getSessionWithTimeOuts());  } catch (IOException e) {   LOG.error("Severe unrecoverable error, exiting", e);   // This is a severe error that we cannot recover from,   // so we need to exit   System.exit(10);  } }    FileTxnSnapLog是个工具类,帮助处理txtlog和snapshot。
/**  * save the datatree and the sessions into a snapshot  * @param dataTree the datatree to be serialized onto disk  * @param sessionsWithTimeouts the sesssion timeouts to be  * serialized onto disk  * @throws IOException  */ public void save(DataTree dataTree,  ConcurrentHashMap<Long, Integer> sessionsWithTimeouts)     throws IOException {     long lastZxid = dataTree.lastProcessedZxid;     File snapshotFile = new File(snapDir, Util.makeSnapshotName(lastZxid));     LOG.info("Snapshotting: 0x{} to {}", Long.toHexString(lastZxid),      snapshotFile);     snapLog.serialize(dataTree, sessionsWithTimeouts, snapshotFile); }    持久化为文件
/**  * serialize the datatree and session into the file snapshot  * @param dt the datatree to be serialized  * @param sessions the sessions to be serialized  * @param snapShot the file to store snapshot into  */ public synchronized void serialize(DataTree dt, Map<Long, Integer> sessions, File snapShot)   throws IOException {  if (!close) {   OutputStream sessOS = new BufferedOutputStream(new FileOutputStream(snapShot));   CheckedOutputStream crcOut = new CheckedOutputStream(sessOS, new Adler32());   //CheckedOutputStream cout = new CheckedOutputStream()   OutputArchive oa = BinaryOutputArchive.getArchive(crcOut);   FileHeader header = new FileHeader(SNAP_MAGIC, VERSION, dbId);   serialize(dt,sessions,oa, header);   long val = crcOut.getChecksum().getValue();   oa.writeLong(val, "val");   oa.writeString("/", "path");   sessOS.flush();   crcOut.close();   sessOS.close();  } }    至此,整个流程已经走完。
2. 集群情况下
集群情况和单机略有不同,集群中使用QuorumPeer来启动ServerCnxnFactory,绑定本地地址
@Override     public void start() {         LOG.info("binding to port " + localAddress);         parentChannel = bootstrap.bind(localAddress);     }   后面的逻辑基本差不多,不再赘述。
从上面的代码流程中,我们可以看出服务器处理请求要么通过Noi要不通过框架Netty来处理请求,请求通过先通过PrepRequestProcessor接收请求,并进行包装,然后请求类型的不同,设置同享数据;然后通过SyncRequestProcessor来序列化快照和事务日志,并根据命令类型改变db的内容,在日志和快照没有写入前不会进行下一个消息处理器;最后调用FinalRequestProcessor来作为消息处理器的终结者,发送响应消息,并触发watcher的处理程序 。