Zookeeper源码解读-程序员宅基地

技术标签: java  源码分析  zookeeper  

Zookeeper源码解读(一)

高可用保障.选举

相信有几年工作经验的技术人员都有自己的知识库,遇到问题首先大脑会快速索引一下去自己的知识库找解决方案,再次去官网或gg。本着技术人员传播分享的优良传统,试着分享一下zk的源码中的选举流程

zookeeper作为分布式系统常用的中间件,其特点和适应场景相信大家都清楚,比如kafka使用zk做leader选举。那么zk自己的leader选举又是怎样的呢?

下载源码,找到zkServer.sh

找到ZOOMAIN入口类

ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"

其中QuorumPeerMain的initializeAndRun()方法


    protected void initializeAndRun(String[] args)
        throws ConfigException, IOException, AdminServerException
    {
    
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
    
            config.parse(args[0]);
        }

        // Start and schedule the the purge task
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();
        //判断是否集群模式
        if (args.length == 1 && config.isDistributed()) {
    
            runFromConfig(config);
        } else {
    
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            ZooKeeperServerMain.main(args);
        }
    }

单机模式就不用再往下看了,没选举什么事,集群模式继续往下走runFromConfig(config)方法

    public void runFromConfig(QuorumPeerConfig config)
            throws IOException, AdminServerException {
    
      ....
      try {
    
          ServerCnxnFactory cnxnFactory = null;
          ServerCnxnFactory secureCnxnFactory = null;
          //为客户端提供读写的server, 2181。集群配置中的另一个端口是选举时的通信端口
          if (config.getClientPortAddress() != null) {
    
              cnxnFactory = ServerCnxnFactory.createFactory();
              cnxnFactory.configure(config.getClientPortAddress(),
                      config.getMaxClientCnxns(),
                      false);
          }

          if (config.getSecureClientPortAddress() != null) {
    
              secureCnxnFactory = ServerCnxnFactory.createFactory();
              secureCnxnFactory.configure(config.getSecureClientPortAddress(),
                      config.getMaxClientCnxns(),
                      true);
          }
          //QuorumPeer类选举、投票实现
          quorumPeer = getQuorumPeer();
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                      config.getDataLogDir(),
                      config.getDataDir()));
          quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
          quorumPeer.enableLocalSessionsUpgrading(
              config.isLocalSessionsUpgradingEnabled());
          //quorumPeer.setQuorumPeers(config.getAllMembers());
          quorumPeer.setElectionType(config.getElectionAlg());
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
          quorumPeer.setConfigFileName(config.getConfigFilename());
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
          if (config.getLastSeenQuorumVerifier()!=null) {
    
              quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
          }
          quorumPeer.initConfigInZKDatabase();
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
          quorumPeer.setLearnerType(config.getPeerType());
          quorumPeer.setSyncEnabled(config.getSyncEnabled());
          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
          //
          quorumPeer.start();
          quorumPeer.join();
      } catch (InterruptedException e) {
    
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
      }
    }

去QuorumPeer类中就能找到投票、选举的实现逻辑

@Override
    public synchronized void start() {
    
        if (!getView().containsKey(myid)) {
    
            throw new RuntimeException("My id " + myid + " not in the peer list");
         }
        loadDataBase();  //此方法主要是从本地文件中最终得到zxid的逻辑
        startServerCnxnFactory();
        try {
    
            adminServer.start();
        } catch (AdminServerException e) {
    
            LOG.warn("Problem starting AdminServer", e);
            System.out.println(e);
        }
        startLeaderElection();//开始选举
        super.start();
    }

跟进startLeaderElection()

protected Election createElectionAlgorithm(int electionAlgorithm){
    
        Election le=null;

        //TODO: use a factory rather than a switch
        switch (electionAlgorithm) {
    
            case 1:
                le = new AuthFastLeaderElection(this);
                break;
            case 2:
                le = new AuthFastLeaderElection(this, true);
                break;
            case 3:
                //Leader选举manager类,管理节点间选举通知的一些操作
                qcm = new QuorumCnxManager(this);
                QuorumCnxManager.Listener listener = qcm.listener;
                if(listener != null){
    
                    listener.start(); //启动选举线程
                    FastLeaderElection fle = new FastLeaderElection(this, qcm);
                    fle.start();
                    le = fle;
                } else {
    
                    LOG.error("Null listener when initializing cnx manager");
                }
                break;
            default:
                assert false;
        }
        return le;
    }

进入FastLeaderElection类,看主要执行逻辑

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
    
        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
        if(self.getQuorumVerifier().getWeight(newId) == 0){
    
            return false;
        }

        /*
         * We return true if one of the following three cases hold:
         * 1- New epoch is higher
         * 2- New epoch is the same as current epoch, but new zxid is higher
         * 3- New epoch is the same as current epoch, new zxid is the same
         *  as current zxid, but server id is higher.
         */
         // 1.若epoch大,为leader
         // 2.若epoch相等,zxid大,为leader
         // 3.若zxid相等,则比较myid(配置文件中)

        return ((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
    }

totalOrderPredicate()方法在见lookForLeader();

public Vote lookForLeader() throws InterruptedException {
    
    ...
        try {
    
            //存放其它节点的投票
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
            //存放选举
            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = finalizeWait;

            synchronized(this){
    
                logicalclock.incrementAndGet(); //增加逻辑时钟
                //更新自己的zxid和epoch
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info("New election. My id =  " + self.getId() +
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
            sendNotifications(); //通知投票

            /*
             * Loop in which we exchange notifications until we find a leader
             */

            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
    //loop直到选举出leader
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
                // 拿到投票消息
                //recvqueue->LinkedBlockedQueue()
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);

                /*
                 * Sends more notifications if haven't received enough.
                 * Otherwise processes new notification.
                 */
                if(n == null){
    
                    if(manager.haveDelivered()){
    
                        sendNotifications();
                    } else {
    
                   //等待其它节点就绪?
                        manager.connectAll();
                    }

                    /*
                     * Exponential backoff
                     */
                    int tmpTimeOut = notTimeout*2;
                    notTimeout = (tmpTimeOut < maxNotificationInterval?
                            tmpTimeOut : maxNotificationInterval);
                    LOG.info("Notification time out: " + notTimeout);
                }
                //判断收到的消息是不是属于这个集群内,n.sid就是我们配置的myid
                else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
    
                    /*
                     * Only proceed if the vote comes from a replica in the current or next
                     * voting view.
                     */
                    switch (n.state) {
    //节点的状态
                    case LOOKING:
                        if (getInitLastLoggedZxid() == -1) {
    
                            LOG.debug("Ignoring notification as our zxid is -1");
                            break;
                        }
                        if (n.zxid == -1) {
    
                            LOG.debug("Ignoring notification from member with -1 zxid" + n.sid);
                            break;
                        }
                        // If notification > current, replace and send messages out
                        //epoch大于logicalclock,则为是新一轮选举
                        if (n.electionEpoch > logicalclock.get()) {
    
                            logicalclock.set(n.electionEpoch); //更新本地的logicalclock
                            recvset.clear();
                            //比较本轮是否可以胜出(是比较传递过来的票据)
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
    
                                //若胜出,把投票改为对方的票据
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
    //否则,票据不变
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            sendNotifications();//将本身的票据广播到集群节点
                            //若收到的epoch小于本身节点的epoch,则忽略
                        } else if (n.electionEpoch < logicalclock.get()) {
    
                            if(LOG.isDebugEnabled()){
    
                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                        + Long.toHexString(n.electionEpoch)
                                        + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                            }
                            break;
                            //若epoch相同,则zxid、myid,如果本轮胜出,更新自己的票据,通知其他节点
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
    
                            updateProposal(n.leader, n.zxid, n.peerEpoch);//更新
                            sendNotifications();//通知其他节点
                        }

                        if(LOG.isDebugEnabled()){
    
                            LOG.debug("Adding vote: from=" + n.sid +
                                    ", proposed leader=" + n.leader +
                                    ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                    ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                        }
                        //存储收到的投票
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        //过半节点一致选举leader则选举结束
                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) {
    
                            // Verify if there is any change in the proposed leader
                            //等待队列中的消息
                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
    
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){
    
                                    recvqueue.put(n);
                                    break;
                                }
                            }
                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            //选举LEADER
                            if (n == null) {
    
                                //LEADING | FOLLOWING
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                                //最终投票结果
                                Vote endVote = new Vote(proposedLeader,
                                        proposedZxid, proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                        //OBSERVING角色不参加选举
                    case OBSERVING:
                        LOG.debug("Notification from observer: " + n.sid);
                        break;
                        //FOLLOWING和LEADING节点都需要参与选举
                    case FOLLOWING:
                    case LEADING:
                        /*
                         * Consider all notifications from the same epoch
                         * together.
                         */
                        if(n.electionEpoch == logicalclock.get()){
     //判断epoch是否相同
                            //加入到本机的投票集合
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                            //投票是否结束,如果结束,再确认LEADER是否有效
                            //如果结束,修改自己的状态并返回投票结果
                            if(termPredicate(recvset, new Vote(n.leader,
                                            n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                                            && checkLeader(outofelection, n.leader, n.electionEpoch)) {
    
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

                       ...
    }

总结:投票选举流程即,比较集群内节点zxid,myid,统计投票结果得出leader。
附本人平时的一些笔记,欢迎私信分享
在这里插入图片描述

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/u013620857/article/details/85818479

智能推荐

使用nginx解决浏览器跨域问题_nginx不停的xhr-程序员宅基地

文章浏览阅读1k次。通过使用ajax方法跨域请求是浏览器所不允许的,浏览器出于安全考虑是禁止的。警告信息如下:不过jQuery对跨域问题也有解决方案,使用jsonp的方式解决,方法如下:$.ajax({ async:false, url: 'http://www.mysite.com/demo.do', // 跨域URL ty..._nginx不停的xhr

在 Oracle 中配置 extproc 以访问 ST_Geometry-程序员宅基地

文章浏览阅读2k次。关于在 Oracle 中配置 extproc 以访问 ST_Geometry,也就是我们所说的 使用空间SQL 的方法,官方文档链接如下。http://desktop.arcgis.com/zh-cn/arcmap/latest/manage-data/gdbs-in-oracle/configure-oracle-extproc.htm其实简单总结一下,主要就分为以下几个步骤。..._extproc

Linux C++ gbk转为utf-8_linux c++ gbk->utf8-程序员宅基地

文章浏览阅读1.5w次。linux下没有上面的两个函数,需要使用函数 mbstowcs和wcstombsmbstowcs将多字节编码转换为宽字节编码wcstombs将宽字节编码转换为多字节编码这两个函数,转换过程中受到系统编码类型的影响,需要通过设置来设定转换前和转换后的编码类型。通过函数setlocale进行系统编码的设置。linux下输入命名locale -a查看系统支持的编码_linux c++ gbk->utf8

IMP-00009: 导出文件异常结束-程序员宅基地

文章浏览阅读750次。今天准备从生产库向测试库进行数据导入,结果在imp导入的时候遇到“ IMP-00009:导出文件异常结束” 错误,google一下,发现可能有如下原因导致imp的数据太大,没有写buffer和commit两个数据库字符集不同从低版本exp的dmp文件,向高版本imp导出的dmp文件出错传输dmp文件时,文件损坏解决办法:imp时指定..._imp-00009导出文件异常结束

python程序员需要深入掌握的技能_Python用数据说明程序员需要掌握的技能-程序员宅基地

文章浏览阅读143次。当下是一个大数据的时代,各个行业都离不开数据的支持。因此,网络爬虫就应运而生。网络爬虫当下最为火热的是Python,Python开发爬虫相对简单,而且功能库相当完善,力压众多开发语言。本次教程我们爬取前程无忧的招聘信息来分析Python程序员需要掌握那些编程技术。首先在谷歌浏览器打开前程无忧的首页,按F12打开浏览器的开发者工具。浏览器开发者工具是用于捕捉网站的请求信息,通过分析请求信息可以了解请..._初级python程序员能力要求

Spring @Service生成bean名称的规则(当类的名字是以两个或以上的大写字母开头的话,bean的名字会与类名保持一致)_@service beanname-程序员宅基地

文章浏览阅读7.6k次,点赞2次,收藏6次。@Service标注的bean,类名:ABDemoService查看源码后发现,原来是经过一个特殊处理:当类的名字是以两个或以上的大写字母开头的话,bean的名字会与类名保持一致public class AnnotationBeanNameGenerator implements BeanNameGenerator { private static final String C..._@service beanname

随便推点

二叉树的各种创建方法_二叉树的建立-程序员宅基地

文章浏览阅读6.9w次,点赞73次,收藏463次。1.前序创建#include&lt;stdio.h&gt;#include&lt;string.h&gt;#include&lt;stdlib.h&gt;#include&lt;malloc.h&gt;#include&lt;iostream&gt;#include&lt;stack&gt;#include&lt;queue&gt;using namespace std;typed_二叉树的建立

解决asp.net导出excel时中文文件名乱码_asp.net utf8 导出中文字符乱码-程序员宅基地

文章浏览阅读7.1k次。在Asp.net上使用Excel导出功能,如果文件名出现中文,便会以乱码视之。 解决方法: fileName = HttpUtility.UrlEncode(fileName, System.Text.Encoding.UTF8);_asp.net utf8 导出中文字符乱码

笔记-编译原理-实验一-词法分析器设计_对pl/0作以下修改扩充。增加单词-程序员宅基地

文章浏览阅读2.1k次,点赞4次,收藏23次。第一次实验 词法分析实验报告设计思想词法分析的主要任务是根据文法的词汇表以及对应约定的编码进行一定的识别,找出文件中所有的合法的单词,并给出一定的信息作为最后的结果,用于后续语法分析程序的使用;本实验针对 PL/0 语言 的文法、词汇表编写一个词法分析程序,对于每个单词根据词汇表输出: (单词种类, 单词的值) 二元对。词汇表:种别编码单词符号助记符0beginb..._对pl/0作以下修改扩充。增加单词

android adb shell 权限,android adb shell权限被拒绝-程序员宅基地

文章浏览阅读773次。我在使用adb.exe时遇到了麻烦.我想使用与bash相同的adb.exe shell提示符,所以我决定更改默认的bash二进制文件(当然二进制文件是交叉编译的,一切都很完美)更改bash二进制文件遵循以下顺序> adb remount> adb push bash / system / bin /> adb shell> cd / system / bin> chm..._adb shell mv 权限

投影仪-相机标定_相机-投影仪标定-程序员宅基地

文章浏览阅读6.8k次,点赞12次,收藏125次。1. 单目相机标定引言相机标定已经研究多年,标定的算法可以分为基于摄影测量的标定和自标定。其中,应用最为广泛的还是张正友标定法。这是一种简单灵活、高鲁棒性、低成本的相机标定算法。仅需要一台相机和一块平面标定板构建相机标定系统,在标定过程中,相机拍摄多个角度下(至少两个角度,推荐10~20个角度)的标定板图像(相机和标定板都可以移动),即可对相机的内外参数进行标定。下面介绍张氏标定法(以下也这么称呼)的原理。原理相机模型和单应矩阵相机标定,就是对相机的内外参数进行计算的过程,从而得到物体到图像的投影_相机-投影仪标定

Wayland架构、渲染、硬件支持-程序员宅基地

文章浏览阅读2.2k次。文章目录Wayland 架构Wayland 渲染Wayland的 硬件支持简 述: 翻译一篇关于和 wayland 有关的技术文章, 其英文标题为Wayland Architecture .Wayland 架构若是想要更好的理解 Wayland 架构及其与 X (X11 or X Window System) 结构;一种很好的方法是将事件从输入设备就开始跟踪, 查看期间所有的屏幕上出现的变化。这就是我们现在对 X 的理解。 内核是从一个输入设备中获取一个事件,并通过 evdev 输入_wayland

推荐文章

热门文章

相关标签