Zookeeper服务器集群启动

news/2024/7/9 22:09:15 标签: zookeeper, 分布式, 集群

集群版服务器启动" style="margin:30px 0px 0px; padding:0px; color:rgb(51,51,51); font-size:20px; font-weight:normal; line-height:1.5; border-bottom-color:rgb(204,204,204); font-family:Arial,sans-serif"> 1.集群版服务器启动

       集群版服务器启动流程如下图所示。

1.1 预启动

(1)统一由QuorumPeerMain作为启动类。

(2)解析配置文件zoo.cfg。

(3)创建并启动历史文件清理器DatadirCleanupManager。

(4)判断当前是集群模式还是单机模式的启动。

       在集群模式中,由于已经在zoo.cfg中配置了多个服务器地址,因此此处会选择集群模式启动ZooKeeper。

public class QuorumPeerMain {
    private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMain.class);
 
    private static final String USAGE = "Usage: QuorumPeerMain configfile";
 
    protected QuorumPeer quorumPeer;
 
    /**
     * To start the replicated server specify the configuration file name on
     * the command line.
     * @param args path to the configfile
     */
    public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            main.initializeAndRun(args);
        } catch (IllegalArgumentException e) {
            LOG.error("Invalid arguments, exiting abnormally", e);
            LOG.info(USAGE);
            System.err.println(USAGE);
            System.exit(2);
        } catch (ConfigException e) {
            LOG.error("Invalid config, exiting abnormally", e);
            System.err.println("Invalid config, exiting abnormally");
            System.exit(2);
        } catch (Exception e) {
            LOG.error("Unexpected exception, exiting abnormally", e);
            System.exit(1);
        }
        LOG.info("Exiting normally");
        System.exit(0);
    }
 
    protected void initializeAndRun(String[] args)
        throws ConfigException, IOException
    {
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);  //解析配置文件zoo.cfg
        }
 
        // Start and schedule the the purge task  //创建并启动历史文件清理器
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();
 
        if (args.length == 1 && config.servers.size() > 0) {   //集群模式
            runFromConfig(config); 
        } else {   //单机模式
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            ZooKeeperServerMain.main(args);  // 集群模式
        }
    }
 
    public void runFromConfig(QuorumPeerConfig config) throws IOException {
      try {
          ManagedUtil.registerLog4jMBeans();
      } catch (JMException e) {
          LOG.warn("Unable to register log4j JMX control", e);
      }
   
      LOG.info("Starting quorum peer");
      try {
                 
          //创建ServerCnxnFactory。
          ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
          //初始化ServerCnxnFactory。
          cnxnFactory.configure(config.getClientPortAddress(),
                                config.getMaxClientCnxns());
   
          //创建QuorumPeer实例。
          quorumPeer = new QuorumPeer();
          quorumPeer.setClientPortAddress(config.getClientPortAddress());
  
          //创建ZooKeeper数据管理器FileTnxSnapLog。
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                      new File(config.getDataLogDir()),
                      new File(config.getDataDir())));
          //设置集群机器节点
          quorumPeer.setQuorumPeers(config.getServers());
          //设置选举的算法
          quorumPeer.setElectionType(config.getElectionAlg());
          //设置机器的SID
          quorumPeer.setMyid(config.getServerId());
          //设置服务器tickTime和会话超时时间限制
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          //??
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
           
          //设置选举的过半算法
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
           
          quorumPeer.setCnxnFactory(cnxnFactory);
          //创建内存数据库ZKDatabase
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          //设置本机初始化角色
          quorumPeer.setLearnerType(config.getPeerType());
   
          quorumPeer.start();
          quorumPeer.join();
      } catch (InterruptedException e) {
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
      }
    }
}

1.2 初始化

(1)创建ServerCnxnFactory。

(2)初始化ServerCnxnFactory。

(3)创建ZooKeeper数据管理器FileTnxSnapLog。

(4)创建QuorumPeer实例。

     Quorum是集群模式下特有的现象,是ZooKe服务器实例(ZooKeeperServer)的托管者,从集群层面看,QuorumPeer代表了ZooKeeper集群中的一台机器。在运行期间,QuorumPeer会不断检测当前服务器实例的运行状态,同时根据情况发起Leader选举。

/**
 * The servers that make up the cluster
 */
protected Map<Long, QuorumServer> quorumPeers;
public static class QuorumServer {  
    public InetSocketAddress addr;
    public InetSocketAddress electionAddr; 
    public long id;   
    public LearnerType type = LearnerType.PARTICIPANT;
}
  
@Override
public synchronized void start() {
    loadDataBase(); //恢复本地数据
    cnxnFactory.start(); //进入NIOServerCnxnFactory的start方法 启动ServerCnxnFactory主线程    
    startLeaderElection();  //进入选举
    super.start(); //此类继承了thread方法,所以会执行自己实现的run方法。
}
  
synchronized public void startLeaderElection() {
   try {
      currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
   } catch(IOException e) {
      RuntimeException re = new RuntimeException(e.getMessage());
      re.setStackTrace(e.getStackTrace());
      throw re;
   }
    for (QuorumServer p : getView().values()) {
        if (p.id == myid) {
            myQuorumAddr = p.addr;
            break;
        }
    }
    if (myQuorumAddr == null) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    if (electionType == 0) {
        try {
            udpSocket = new DatagramSocket(myQuorumAddr.getPort());
            responder = new ResponderThread();
            responder.start();
        } catch (SocketException e) {
            throw new RuntimeException(e);
        }
    }
    this.electionAlg = createElectionAlgorithm(electionType);
}
  
protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;
             
    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 0:
        le = new LeaderElection(this);
        break;
    case 1:
        le = new AuthFastLeaderElection(this);
        break;
    case 2:
        le = new AuthFastLeaderElection(this, true);
        break;
    case 3:
        qcm = new QuorumCnxManager(this);
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null){
            listener.start();
            le = new FastLeaderElection(this, qcm);
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}

(5)创建内存数据库ZKDatabase。

     ZKdatebase是ZooKeeper的内存数据库,负责管理ZooKeeper的所有会话记录以及DataTree和事务日志的存储。

(6)初始化QuorumPeer。

      QuorumPeer是ZooKeeperServer的托管者,需要将一些核心组件注册到QuorumPeer中去,包括FileTnxSnapLog、ServerCnxnFactory和ZKDatabase。同时ZooKe还会对QuorumPeer配置一些参数,包括服务器地址列表、Leader选举算法和会话超时时间限制等。

(7)恢复本地数据。

(8)启动ServerCnxnFactory主线程。

(9)启动QuorumPeer主线程。

@Override
public void run() {
    setName("QuorumPeer" + "[myid=" + getId() + "]" +
            cnxnFactory.getLocalAddress());
 
    LOG.debug("Starting quorum peer");
    ... ...
 
    try {
        /*
         * Main loop
         */
        while (running) {
            switch (getPeerState()) {
            case LOOKING:   //开始选举
                LOG.info("LOOKING");
 
                if (Boolean.getBoolean("readonlymode.enabled")) {
                    LOG.info("Attempting to start ReadOnlyZooKeeperServer");
 
                    // Create read-only server but don't start it immediately
                    final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
                            logFactory, this,
                            new ZooKeeperServer.BasicDataTreeBuilder(),
                            this.zkDb);
 
                    // Instead of starting roZk immediately, wait some grace
                    // period before we decide we're partitioned.
                    //
                    // Thread is used here because otherwise it would require
                    // changes in each of election strategy classes which is
                    // unnecessary code coupling.
                    Thread roZkMgr = new Thread() {
                        public void run() {
                            try {
                                // lower-bound grace period to 2 secs
                                sleep(Math.max(2000, tickTime));
                                if (ServerState.LOOKING.equals(getPeerState())) {
                                    roZk.startup();
                                }
                            } catch (InterruptedException e) {
                                LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                            } catch (Exception e) {
                                LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                            }
                        }
                    };
                    try {
                        roZkMgr.start();
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                        setPeerState(ServerState.LOOKING);
                    } finally {
                        // If the thread is in the the grace period, interrupt
                        // to come out of waiting.
                        roZkMgr.interrupt();
                        roZk.shutdown();
                    }
                } else {
                    try {
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                        setPeerState(ServerState.LOOKING);
                    }
                }
                break;
            case OBSERVING:  //作为观测者
                try {
                    LOG.info("OBSERVING");
                    setObserver(makeObserver(logFactory));
                    observer.observeLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e );                       
                } finally {
                    observer.shutdown();
                    setObserver(null);
                    setPeerState(ServerState.LOOKING);
                }
                break;
            case FOLLOWING:  //作为Follower
                try {
                    LOG.info("FOLLOWING");
                    setFollower(makeFollower(logFactory));
                    follower.followLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    follower.shutdown();
                    setFollower(null);
                    setPeerState(ServerState.LOOKING);
                }
                break;
            case LEADING:   //作为leader
                LOG.info("LEADING");
                try {
                    setLeader(makeLeader(logFactory));
                    leader.lead();
                    setLeader(null);
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    if (leader != null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    setPeerState(ServerState.LOOKING);
                }
                break;
            }
        }
    } finally {
        LOG.warn("QuorumPeer main thread exited");
        try {
            MBeanRegistry.getInstance().unregisterAll();
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        jmxQuorumBean = null;
        jmxLocalPeerBean = null;
    }
}

1.3 Leader选举

(1) 初始化Leader选举。

        Leader选举可以说是集群和单机模式启动ZooKeeper最大的不同点。ZooKeeper首先会根据自身的SID(服务器ID)、lastLoggedZxid(最新的ZXID)和当前的服务器epoch(currentEpoch)来生成一个初始化的投票。简单地讲,在初始化过程中,每个服务器都会给自己投票。然后,ZooKeeper会根据zoo.cfg中的配置,创建相应的Leader选举算法实现。

       在初始化阶段,ZooKeeper会创建Leader选举所需要的网络I/O层QuorumCnxManager,同时启动对Leader选举端口的监听,等待集群中其他服务器创建连接。    

(2) 注册JMX服务。

(3)检测当前服务器状态。

       上文已经提到,QuorumPeer是ZooKeeper服务器实例的托管者,在运行期间,QuorumPeer的核心工作就是不断地检测当前服务器的状态,并作出相应的处理。在正常情况下,ZooKeeper服务器的状态在LOOKING、LEADING和FOLLOWING/OBSERVING之间切换。而在启动阶段,QuorumPeer的初始化状态是LOOKING,因此开始进行Leader选举。

(4)Leader选举。

        ZooKeeper的Leader选举过程,简单地讲,就是一个集群中所有机器相互之间进行一系列投票,选举出最适合的机器成为Leader,同时其他机器成为Follower或是Observer的集群及其角色初始化过程。关于机器选举算法,简而言之,就是集群中哪个机器处理的数据越新(通常我们根据每个服务器处理过的最大ZXID来比较确定其数据是否更新),其越有可能成为Leader。

1.4 Leader和Follower启动期交互过程

      到这里为止,ZooKeeper已经完成了Leader选举,并且集群中每个服务器都已经确定了自己的角色。通常情况下就分为Leader和Follower两种角色。Leader和Follower在启动期间的大致交互流程如下图。

(1)创建Leader服务器和Follower服务器。

       完成Leader选举之后,每个服务器都会根据自己的服务器角色创建相应的服务器实例,并开始进入各自角色的主流程。

(2)Leader服务器启动Follower接收器LearnerCnxAcceptor。

       在ZooKeeper集群运行期间,Leader服务器需要和所有其余的服务器(Learner)保持连接以确定集群的机器存活情况。LearnerCnxAcceptor接收器用于负责接收所有非Leader服务器的连接请求。

(3)Learner服务器开始和Leader建立连接。

       所有的Learner服务器在启动完毕后,会从Leader选举的投票结果中找到当前集群中的Leader服务器,然后与其建立连接。

(4)Leader服务器创建LearnerHandler。

        Leader接收来自其他机器的连接创建请求后,会创建一个LearnerHandler实例。每个LearnerHandler实例都对应了一个Leader与Learner服务器之间的连接,其负责Leader和Learner服务器之间机会所有的消息通信和数据同步。

(5)向Leader注册。

      当和Leader建立连接后,Learner就会开始向Leader进行注册。所谓的注册就是将Learner服务器自己的基本信息发送给Leader服务器。这些基本信息我们称之为LearnerInfo,包括当前服务器的SID和服务器最新的ZXID。

(6)Leader解析Learner信息,计算新的epoch。

      Leader服务器在接收到Learner的基本信息后,会解析出该Learner的SID和ZXID,然后根据该Learner的ZXID解析出其对应的epoch_of_learner,和当前Leader的服务器的epoch_of_leader进行比较,如果改Learner的epoch_of_learner更大的话,那么就更新Leader的epoch:epoch_of_leader = epoch_of_learner + 1;然后,LearnerHandler会进行等待,直到过半的Learner已经向Leader进行了注册,同时更新了epoch_of_leader之后,Leader就可以确定当前集群的epoch了。

(7)发送Leader状态。

    计算出新的epoch后,Leader会将该信息以一个LEADERINFO消息的形式发送给Learner,同时等待Learner的回应。

(8)Learner发送ACK信息。

      Follower在收到来自Leader的LEADERINFO消息后,会解析出epoch和ZXID,然后向Leader反馈一个ACKEPOCH响应。

(9)数据同步。

     Leader服务器收到Learner的这个ACK消息后,就可以开始与其进行数据同步了。具体数据同步后面再讲。

(10)启动Leader和Learner服务器。

     当有过半的Learner已经完成了数据同步,那么Leader和Learner服务器实例就可以开始启动了。

1.5 Leader和Follower启动

(1)创建并启动会话管理器。

(2)初始化ZooKeeper的请求处理链。

          和单机版服务器一样,集群模式下,每个服务器都会在启动阶段串联请求处理链,只是根据角色不同,会有不同的请求处理链。

(3)注册JMX服务。

至此,集群版的ZooKeeper服务器启动完毕。


参考 《从Paxos到ZooKeeper分布式一致性原理与实践》 

http://www.niftyadmin.cn/n/952595.html

相关文章

Zookeeper选举leader过程

1.1 Leader选举概述 Leader选举是ZooKeeper中最重要的技术之一&#xff0c;也是保证分布式数据一致性的关键所在。 1.1.1 服务器启动时期的Leader选举 我们讲解Leader选举的时候&#xff0c;隐式条件便是ZooKeeper的集群规模至少是2台机器&#xff0c;这里我们以3台机器组成的服…

分布式学习之Quorum机制和Lease机制

Quorum 机制 感觉这个名字很难读&#xff0c;今晚百度了一下&#xff0c;其实是这样的&#xff0c;腐朽的西方资本主义社会在举行选举时&#xff0c;通常要求参与人数必须达到额定的数量&#xff0c;才能成为一个法定有效的选举。这个额定的人数就 是Quorum&#xff0c;这是原始…

Jetty主要组件以及设计模式

官方文档&#xff1a; http://www.eclipse.org/jetty/documentation/current/architecture.html#basic-architecture jetty四大组件 server相当于元神&#xff0c;协调conector和handler工作&#xff0c;connector负责接受各种http连接请求的组件&#xff0c;handler就负责处理…

Jetty源码结构及启动过程

包下载&#xff1a; http://download.eclipse.org/jetty/ 包结构: 源码下载&#xff1a; https://github.com/eclipse/jetty.project/releases 代码模块&#xff1a; 启动方式&#xff1a; start.ini配置文件 这个默认配置决定启动了哪些模块以及启动的顺序。 这些配置文件的顺…

Jetty请求过程源码分析

BIO编程通常为每一个连接分配一个线程&#xff0c;由该线程通过InputStrem、OutputStream以“顺序”方式一路解析、处理、返回结构。这时候&#xff0c;线程数&#xff1d;&#xff1d;连接数。 NIO编程通常会建立一组驱动线程&#xff0c;每个线程驱动一个“状态对象”队列。这…

分布式原理介绍

一、回顾分布式特点 1.集中式特点 一台或多台计算机组成中心接节点&#xff0c;所有的数据都存在中心节点上。Client端只负责数据的展示&#xff0c;Server处理数据的存储和处理。显而易见&#xff0c;优点是结构简单容易部署&#xff0c;无需考虑服务多个节点部署&#xff0c;…

网络安全知识之DDOS介绍

一. 神马是DDOS&#xff1f;——野蛮且残忍的攻击方式 DOS&#xff1a;DOS的全称是Denial of Service&#xff0c;意思是拒绝服务。其目的就是通过各种手段&#xff0c;使网络或者计算机无法提供正常服务。 DDOS&#xff1a;DDOS攻击就是高级的DOS攻击&#xff0c;全称是Distri…

网络安全知识之SQL注入

SQL注入是什么 所谓SQL注入&#xff0c;就是通过把SQL命令插入到Web表单提交或输入域名或页面请求的查询字符串&#xff0c;最终达到欺骗服务器执行恶意的SQL命令。具体来说&#xff0c;它是利用现有应用程序&#xff0c;将&#xff08;恶意&#xff09;的SQL命令注入到后台数…