集群版服务器启动" style="margin:30px 0px 0px; padding:0px; color:rgb(51,51,51); font-size:20px; font-weight:normal; line-height:1.5; border-bottom-color:rgb(204,204,204); font-family:Arial,sans-serif"> 1.集群版服务器启动
集群版服务器启动流程如下图所示。
1.1 预启动
(1)统一由QuorumPeerMain作为启动类。
(2)解析配置文件zoo.cfg。
(3)创建并启动历史文件清理器DatadirCleanupManager。
(4)判断当前是集群模式还是单机模式的启动。
在集群模式中,由于已经在zoo.cfg中配置了多个服务器地址,因此此处会选择集群模式启动ZooKeeper。
public class QuorumPeerMain {
private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMain.class);
private static final String USAGE = "Usage: QuorumPeerMain configfile";
protected QuorumPeer quorumPeer;
/**
* To start the replicated server specify the configuration file name on
* the command line.
* @param args path to the configfile
*/
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args);
} catch (IllegalArgumentException e) {
LOG.error("Invalid arguments, exiting abnormally", e);
LOG.info(USAGE);
System.err.println(USAGE);
System.exit(2);
} catch (ConfigException e) {
LOG.error("Invalid config, exiting abnormally", e);
System.err.println("Invalid config, exiting abnormally");
System.exit(2);
} catch (Exception e) {
LOG.error("Unexpected exception, exiting abnormally", e);
System.exit(1);
}
LOG.info("Exiting normally");
System.exit(0);
}
protected void initializeAndRun(String[] args)
throws ConfigException, IOException
{
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]); //解析配置文件zoo.cfg
}
// Start and schedule the the purge task //创建并启动历史文件清理器
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
if (args.length == 1 && config.servers.size() > 0) { //集群模式
runFromConfig(config);
} else { //单机模式
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args); // 集群模式
}
}
public void runFromConfig(QuorumPeerConfig config) throws IOException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
LOG.info("Starting quorum peer");
try {
//创建ServerCnxnFactory。
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
//初始化ServerCnxnFactory。
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
//创建QuorumPeer实例。
quorumPeer = new QuorumPeer();
quorumPeer.setClientPortAddress(config.getClientPortAddress());
//创建ZooKeeper数据管理器FileTnxSnapLog。
quorumPeer.setTxnFactory(new FileTxnSnapLog(
new File(config.getDataLogDir()),
new File(config.getDataDir())));
//设置集群机器节点
quorumPeer.setQuorumPeers(config.getServers());
//设置选举的算法
quorumPeer.setElectionType(config.getElectionAlg());
//设置机器的SID
quorumPeer.setMyid(config.getServerId());
//设置服务器tickTime和会话超时时间限制
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
//??
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
//设置选举的过半算法
quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
quorumPeer.setCnxnFactory(cnxnFactory);
//创建内存数据库ZKDatabase
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
//设置本机初始化角色
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
}
1.2 初始化
(1)创建ServerCnxnFactory。
(2)初始化ServerCnxnFactory。
(3)创建ZooKeeper数据管理器FileTnxSnapLog。
(4)创建QuorumPeer实例。
Quorum是集群模式下特有的现象,是ZooKe服务器实例(ZooKeeperServer)的托管者,从集群层面看,QuorumPeer代表了ZooKeeper集群中的一台机器。在运行期间,QuorumPeer会不断检测当前服务器实例的运行状态,同时根据情况发起Leader选举。
/**
* The servers that make up the cluster
*/
protected Map<Long, QuorumServer> quorumPeers;
public static class QuorumServer {
public InetSocketAddress addr;
public InetSocketAddress electionAddr;
public long id;
public LearnerType type = LearnerType.PARTICIPANT;
}
@Override
public synchronized void start() {
loadDataBase(); //恢复本地数据
cnxnFactory.start(); //进入NIOServerCnxnFactory的start方法 启动ServerCnxnFactory主线程
startLeaderElection(); //进入选举
super.start(); //此类继承了thread方法,所以会执行自己实现的run方法。
}
synchronized public void startLeaderElection() {
try {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
for (QuorumServer p : getView().values()) {
if (p.id == myid) {
myQuorumAddr = p.addr;
break;
}
}
if (myQuorumAddr == null) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
this.electionAlg = createElectionAlgorithm(electionType);
}
protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
qcm = new QuorumCnxManager(this);
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
le = new FastLeaderElection(this, qcm);
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
(5)创建内存数据库ZKDatabase。
ZKdatebase是ZooKeeper的内存数据库,负责管理ZooKeeper的所有会话记录以及DataTree和事务日志的存储。
(6)初始化QuorumPeer。
QuorumPeer是ZooKeeperServer的托管者,需要将一些核心组件注册到QuorumPeer中去,包括FileTnxSnapLog、ServerCnxnFactory和ZKDatabase。同时ZooKe还会对QuorumPeer配置一些参数,包括服务器地址列表、Leader选举算法和会话超时时间限制等。
(7)恢复本地数据。
(8)启动ServerCnxnFactory主线程。
(9)启动QuorumPeer主线程。
@Override
public void run() {
setName("QuorumPeer" + "[myid=" + getId() + "]" +
cnxnFactory.getLocalAddress());
LOG.debug("Starting quorum peer");
... ...
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING: //开始选举
LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
logFactory, this,
new ZooKeeperServer.BasicDataTreeBuilder(),
this.zkDb);
// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING: //作为观测者
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
setPeerState(ServerState.LOOKING);
}
break;
case FOLLOWING: //作为Follower
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
case LEADING: //作为leader
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
setPeerState(ServerState.LOOKING);
}
break;
}
}
} finally {
LOG.warn("QuorumPeer main thread exited");
try {
MBeanRegistry.getInstance().unregisterAll();
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
jmxQuorumBean = null;
jmxLocalPeerBean = null;
}
}
1.3 Leader选举
(1) 初始化Leader选举。
Leader选举可以说是集群和单机模式启动ZooKeeper最大的不同点。ZooKeeper首先会根据自身的SID(服务器ID)、lastLoggedZxid(最新的ZXID)和当前的服务器epoch(currentEpoch)来生成一个初始化的投票。简单地讲,在初始化过程中,每个服务器都会给自己投票。然后,ZooKeeper会根据zoo.cfg中的配置,创建相应的Leader选举算法实现。
在初始化阶段,ZooKeeper会创建Leader选举所需要的网络I/O层QuorumCnxManager,同时启动对Leader选举端口的监听,等待集群中其他服务器创建连接。
(2) 注册JMX服务。
(3)检测当前服务器状态。
上文已经提到,QuorumPeer是ZooKeeper服务器实例的托管者,在运行期间,QuorumPeer的核心工作就是不断地检测当前服务器的状态,并作出相应的处理。在正常情况下,ZooKeeper服务器的状态在LOOKING、LEADING和FOLLOWING/OBSERVING之间切换。而在启动阶段,QuorumPeer的初始化状态是LOOKING,因此开始进行Leader选举。
(4)Leader选举。
ZooKeeper的Leader选举过程,简单地讲,就是一个集群中所有机器相互之间进行一系列投票,选举出最适合的机器成为Leader,同时其他机器成为Follower或是Observer的集群及其角色初始化过程。关于机器选举算法,简而言之,就是集群中哪个机器处理的数据越新(通常我们根据每个服务器处理过的最大ZXID来比较确定其数据是否更新),其越有可能成为Leader。
1.4 Leader和Follower启动期交互过程
到这里为止,ZooKeeper已经完成了Leader选举,并且集群中每个服务器都已经确定了自己的角色。通常情况下就分为Leader和Follower两种角色。Leader和Follower在启动期间的大致交互流程如下图。
(1)创建Leader服务器和Follower服务器。
完成Leader选举之后,每个服务器都会根据自己的服务器角色创建相应的服务器实例,并开始进入各自角色的主流程。
(2)Leader服务器启动Follower接收器LearnerCnxAcceptor。
在ZooKeeper集群运行期间,Leader服务器需要和所有其余的服务器(Learner)保持连接以确定集群的机器存活情况。LearnerCnxAcceptor接收器用于负责接收所有非Leader服务器的连接请求。
(3)Learner服务器开始和Leader建立连接。
所有的Learner服务器在启动完毕后,会从Leader选举的投票结果中找到当前集群中的Leader服务器,然后与其建立连接。
(4)Leader服务器创建LearnerHandler。
Leader接收来自其他机器的连接创建请求后,会创建一个LearnerHandler实例。每个LearnerHandler实例都对应了一个Leader与Learner服务器之间的连接,其负责Leader和Learner服务器之间机会所有的消息通信和数据同步。
(5)向Leader注册。
当和Leader建立连接后,Learner就会开始向Leader进行注册。所谓的注册就是将Learner服务器自己的基本信息发送给Leader服务器。这些基本信息我们称之为LearnerInfo,包括当前服务器的SID和服务器最新的ZXID。
(6)Leader解析Learner信息,计算新的epoch。
Leader服务器在接收到Learner的基本信息后,会解析出该Learner的SID和ZXID,然后根据该Learner的ZXID解析出其对应的epoch_of_learner,和当前Leader的服务器的epoch_of_leader进行比较,如果改Learner的epoch_of_learner更大的话,那么就更新Leader的epoch:epoch_of_leader = epoch_of_learner + 1;然后,LearnerHandler会进行等待,直到过半的Learner已经向Leader进行了注册,同时更新了epoch_of_leader之后,Leader就可以确定当前集群的epoch了。
(7)发送Leader状态。
计算出新的epoch后,Leader会将该信息以一个LEADERINFO消息的形式发送给Learner,同时等待Learner的回应。
(8)Learner发送ACK信息。
Follower在收到来自Leader的LEADERINFO消息后,会解析出epoch和ZXID,然后向Leader反馈一个ACKEPOCH响应。
(9)数据同步。
Leader服务器收到Learner的这个ACK消息后,就可以开始与其进行数据同步了。具体数据同步后面再讲。
(10)启动Leader和Learner服务器。
当有过半的Learner已经完成了数据同步,那么Leader和Learner服务器实例就可以开始启动了。
1.5 Leader和Follower启动
(1)创建并启动会话管理器。
(2)初始化ZooKeeper的请求处理链。
和单机版服务器一样,集群模式下,每个服务器都会在启动阶段串联请求处理链,只是根据角色不同,会有不同的请求处理链。
(3)注册JMX服务。
至此,集群版的ZooKeeper服务器启动完毕。