Curator leader选举


背景

最近一周,一直在做squirrel-ha-service高可用的改进。简单介绍下squirrel-ha-service。squirrel-ha-service(后文都简写为ha)是线上持续监控redis集群,保证redis集群高可用的一个服务。会通过redis cluster nodes命令获取redis集群所有节点的状态,如果某个节点宕机了,ha一旦发现宕机节点,首先会通过修改zk通知redis客户端不再访问该节点,然后自动替换宕机节点。替换完节点后,再次通过修改zk通知redis客户端刷新本地路由,将新添加的节点加入本地路由表。
  
上述左图是我们目前线上ha大致架构,基本能保证redis集群的整体可用性。但是最近我们在做机房容灾相关,这个架构就有问题了。

上述是右图我们线上一个redis集群的部署情况,基本都是3机房部署。图中是dx,yf,gh三个机房,每个机房都有一个主节点一个从节点。redis集群这样部署,能保证最高的可用性。即使线上某个机房发生故障,剩下的两个机房也能继续提供服务。
这里我们假设现在gh机房发生网络故障,先忽视图中红色框。gh-master节点原来有一个dx-slave节点,一旦光环机房不可用,dx-slave节点会发起提升自己为master的请求,dx和yf机房的两个主节点投票通过,这样一个新的集群dx(2个master节点),yf(一个master,一个slave)可以继续对外提供服务。
从redis集群角度看,如果机房是三机房主从均匀部署,单个机房发生故障另外两个机房依然能继续提供服务。但是此时我们的squirrel-ha服务因为也处于gh机房(我们线上一个redis集群唯一对应一个ha监控服务),由于gh机房和另外两个机房dx,yf网络不通,此时ha服务无法刷新zk,通知dx和yf的redis客户端更新路由了。之所以发生这个问题,是因为ha自身没有保证高可用,所以我们考虑引进zk选举来保证ha服务的可用性。如上图红框,当gh机房发生网络故障,之前dx-ha-watcher会替代gh-ha成为新的leader,开始监控redis集群。

Curator选主

上面我们说了,在发生机房网络分区时候,ha自身不能保证高可用。所以接下来我们会将ha接入zk,通过zk的选举功能保证ha自身的高可用。我们会使用Netflix开源的curator来实现选主,这个框架解决了原生zookeeper client断线重连相关问题,并且提供了2套选主方案。

  • LeaderLatch:随机从候选着中选出一台作为leader,选中之后除非调用close()释放leadship,否则其他的后选择无法成为leader。这种策略适合主备应用,当主节点意外宕机之后,多个从节点会自动选举其中一个为新的主节点
  • Leader Election:这种选举策略跟Leader Latch选举策略不同之处在于每个实例都能公平获取领导权,而且当获取领导权的实例在释放领导权之后,该实例还有机会再次获取领导权。另外,选举出来的leader不会一直占有领导权,当 takeLeadership(CuratorFramework client) 方法执行结束之后会自动释放领导权。

具体选择哪种策略,还是要用户根据自己的需求选择。

LeaderLatch

public class ZKLeaderUtils {

    private static String KEY_LEADER_PATH = "/test";

    public static void main(String[] args) throws Exception {
        List<LeaderLatch> list = new ArrayList<>();
        final List<CuratorFramework> clients = new ArrayList<>();
        for(int i=0; i<3; i++){
            CuratorFramework curatorClient = CuratorFrameworkFactory.newClient("localhost:2181", 60 * 1000, 30 * 1000,
                    new RetryNTimes(3, 1000));
            curatorClient.start();
            final LeaderLatch leaderLatch = new LeaderLatch(curatorClient,KEY_LEADER_PATH, String.valueOf(i));
            leaderLatch.addListener(new LeaderLatchListener() {
                @Override
                public void isLeader() {
                    System.out.println(leaderLatch.getId() +  ":I am leader. I am doing jobs!");
                }

                @Override
                public void notLeader() {
                    System.out.println(leaderLatch.getId() +  ":I am not leader. I will do nothing!");
                }
            });
            leaderLatch.start();
            list.add(leaderLatch);
            clients.add(curatorClient);
        }

        Thread.sleep(2000);
        int i=0;
        for(LeaderLatch leaderLatch : list){
            leaderLatch.close();
            clients.get(i).close();
            i++;
        }

        Thread.sleep(100000);
    }
}

上述代码,创建了3个LeaderLatch实例,然后sleep 2s,让3个实例进行选主。最后依次调用close方法,释放leader。

控制台会随机输出:
2:I am leader. I am doing jobs!
重复执行几次,可以看到不同的client随机获得leader。

Leader Election

private static class ExampleClient extends LeaderSelectorListenerAdapter{
    private volatile boolean isLeaderRelease = false;
    private LeaderSelector leaderSelector;

    ExampleClient(CuratorFramework client, String path){
        leaderSelector = new LeaderSelector(client, path, this);
        //释放leader后,还可以重新获取leader
        leaderSelector.autoRequeue();
    }

    public void start(){
        //leader选举会在后台线程进行,该方法会立即返回
        leaderSelector.start();
    }

    /**
     * 能进入到takeLeadership方法,表示leaderSelector已经成为leader了。只要该方法不退出或者
     * 不发生其他异常,就一直持有leader
     * @param client
     * @throws Exception
     */
    public void takeLeadership(CuratorFramework client) throws Exception {
        while (!isLeaderRelease){
            Thread.sleep(1000);
        }
    }

    public boolean isLeader() {
        return leaderSelector.hasLeadership();
    }

    /**
     * 设置isLeaderRelease 为false,让takeLeadership方法退出,这样leader就释放了。
     * 注意在成功释放leader后,我们又重新设置isLeaderRelease 为true,这样再次获得
     * leader后,如果不主动释放或者出现其他异常情况,又可以一直持有leader。
     */
    public void releaseLeader() {
        this.isLeaderRelease = true;
        while (leaderSelector.hasLeadership()){
            try {
                Thread.sleep(100);
            } catch (InterruptedException ignore) {}
        }
        this.isLeaderRelease = false;
    }
}

上述代码ExampleClient主要关注以下四点

  • 构造方法中leaderSelector.autoRequeue();这个确保了leaderSelector在释放leader后,还可以重新获取leader。
  • takeLeaderShip方法,一旦进入这个方法,就表示leaderSelector已经成为leader,从这个方法退出,就释放leader。可以看到这就同LeaderLatch不一样,LeaderLatch只能通过主动close释放leader。
  • releaseLeader方法我们通过设置isLeaderRelease为true,让takeLeaderShip能退出循环,达到释放leader目的。
  • 通过继承了LeaderSelectorListenerAdapter类,一旦出现SUSPENDED或者LOST连接问题,能主动释放leader,这个下面会详细说下。

LeaderSelectorListenerAdapter

public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener
{
    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState)
    {
        if ( (newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST) )
        {
            throw new CancelLeadershipException();
        }
    }
}

一旦LeaderSelector启动,它会向curator客户端添加监听器。 使用LeaderSelector必须时刻注意连接的变化。一旦出现连接问题如SUSPENDED,或者LOST,curator实例必须确保其不再是leader并且其takeLeadership()应该直接退出。

推荐的做法是,如果发生SUSPENDED或者LOST连接问题,最好直接抛CancelLeadershipException,此时,leaderSelector实例会尝试中断并且取消正在执行takeLeadership()方法的线程。 建议扩展LeaderSelectorListenerAdapter, LeaderSelectorListenerAdapter中已经提供了推荐的处理方式 。

LeaderSelector

private static class WrappedListener implements LeaderSelectorListener
{
    private final LeaderSelector leaderSelector;
    private final LeaderSelectorListener listener;

    public WrappedListener(LeaderSelector leaderSelector, LeaderSelectorListener listener)
    {
        this.leaderSelector = leaderSelector;
        this.listener = listener;
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception
    {
        listener.takeLeadership(client);
    }

    @Override
    public void stateChanged(CuratorFramework client, ConnectionState newState)
    {
        try
        {
            listener.stateChanged(client, newState);
        }
        catch ( CancelLeadershipException dummy )
        {
            leaderSelector.interruptLeadership();
        }
    }
}
    
/**
 * Attempt to cancel and interrupt the current leadership if this instance has leadership
 */
public synchronized void interruptLeadership()
{
    Future<?> task = ourTask.get();
    if ( task != null )
    {
        task.cancel(true);
    }
}

可以看到一旦catch到listener.stateChanged抛出的CancelLeadershipException异常,会调用leaderSelector.interruptLeadership()尝试中断,所以我们上面的ExampleClient的takeLeaderShip方法必须要是可以响应中断的

/**
 * 能进入到takeLeadership方法,表示leaderSelector已经成为leader了。只要该方法不退出或者
 * 不发生其他异常,就一直持有leader
 * @param client
 * @throws Exception
 */
public void takeLeadership(CuratorFramework client) throws Exception {
    while (!isLeaderRelease){
        Thread.sleep(1000);
    }
}

上述方法Thread.sleep确实可以响应中断,所以一旦出现SUSPENDED或者LOST连接问题,就会从takeLeaderShip方法退出并释放leader。

下面我们来测试下上述的ExampleClient

public static void main(String[] args) throws InterruptedException {
    List<ExampleClient> clients = new ArrayList<>();
    for(int i=0; i<3; i++){
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3));
        client.start();
        final ExampleClient exampleClient = new ExampleClient(client, "/test");
        exampleClient.start();
        clients.add(exampleClient);
        final int index = i;

        new Thread(new Runnable() {
            @Override
            public void run() {
                //外层while(true)循环防止exampleClient还没有获取leader,就直接退出线程了
                while (true){
                    //内层循环如果成为leader就打印日志
                    while (exampleClient.isLeader()){
                        System.out.println("haLeaderService " + index + " is leader");
                        try {
                            Thread.sleep(2000);
                        } catch (InterruptedException ignore) {}
                    }

                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException ignore) {}
                }
            }
        }).start();

        Thread.sleep(2000);
    }

    while (true){
        Thread.sleep(7000);
        for(ExampleClient exampleClient : clients){
            if(exampleClient.isLeader()){
                exampleClient.releaseLeader();
                break;
            }
        }
    }
}

上述我们创建了3个ExampleClient,如果成为leader就会打印日志。后续每隔7s,又会主动释放leader,这样其他follower就会成为leader。

控制台输出如下:

haLeaderService 0 is leader
haLeaderService 0 is leader
haLeaderService 0 is leader
haLeaderService 0 is leader
haLeaderService 1 is leader
haLeaderService 1 is leader
haLeaderService 1 is leader
haLeaderService 1 is leader
haLeaderService 2 is leader
haLeaderService 2 is leader
haLeaderService 2 is leader
haLeaderService 2 is leader
haLeaderService 0 is leader
haLeaderService 0 is leader
haLeaderService 0 is leader

可以看到每隔7s,leader确实会切换一次。

zk terminal
上图是我们通过zkCli命令连接到zk server获取的信息。我们创建3个ExampleClient,会在/test节点下面分别创建3个临时节点,观察后面的数字0019,0020,0021。
其实Leader Election内部通过一个分布式锁来实现选主;并且选主结果是公平的,zk会按照各节点请求的次序成为主节点,当前最小序号的节点成为主节点,其他节点会添加一个对于当前最小节点的监听watcher。一旦发现最小节点不存在,第二小的节点就会成为leader。


文章作者: 叶明
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 叶明 !
评论
 上一篇
从零开始学TIDB一(搭建TIDB集群) 从零开始学TIDB一(搭建TIDB集群)
本文介绍如何在单机上(Mac OS)通过 Docker Compose 快速一键部署一套 TiDB 测试集群,我们会从零开始搭建一套由3 个 PD,3 个 TiKV,1 个 TiDB构成的集群。主要包括以下两个步骤: Mac OS系统上安
2018-08-26
下一篇 
Dubbo源码学习四(Consumer,Provider通信) Dubbo源码学习四(Consumer,Provider通信)
之前我们介绍了Provider启动时候向注册中心注册服务Dubbo源码学习二(服务注册),Consumer启动时候向注册中心订阅服务Dubbo源码学习三(服务引用),本文我们就来看下Consumer端和Provider是怎么通信的。
2018-07-31
  目录