在分布式计算中,leader election 是很重要的一个功能, 这个选举过程是这样子的
- 指派一个进程作为组织者,将任务分发给各节点。在任务开始前,哪个节点都不知道谁是 leader 或者 coordinator
- 当选举算法开始执行后,每个节点最终会得到一个唯一的节点作为任务 leader。
- 除此之外,选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来。
Curator 有两种选举 recipe(Leader Latch 和 Leader Election )
1.LeaderLatch
参与选举的所有节点,会创建一个顺序节点,其中最小的节点会设置为master节点, 没抢到Leader的节点都监听前一个节点的删除事件,在前一个节点删除后进行重新抢 主,当 master 节点手动调用 close 方法或者 master 节点挂了之后,后续的子节点会抢占master。 其中spark使用的就是这种方法。
实例被选为leader后,执行isLeader中的逻辑。当领导权易主之后才会再次执行isLeader。下面给出部分核心代码:
// 初始化 LeaderLatch
public void setLeaderLatch(String path) {
try {
String id = "client#" + InetAddress.getLocalHost().getHostAddress();
// client是具体的Curator客户端连接
leaderLatch = new LeaderLatch(client, path, id);
// isLeader 中的方法会在实例被选为主节点后被执行, 而notLeader中的不会被执行
// 如果主节点被失效, 会进行重新选主
LeaderLatchListener leaderLatchListener = new LeaderLatchListener() {
@Override
public void isLeader() {
logger.info("[LeaderLatch]我是主节点, id={}", leaderLatch.getId());
}
@Override
public void notLeader() {
logger.info("[LeaderLatch]我不是主节点, id={}", leaderLatch.getId());
}
};
leaderLatch.addListener(leaderLatchListener);
leaderLatch.start();
} catch (Exception e) {
logger.error("c创建LeaderLatch失败, path={}", path);
}
}
// 判断实例是否是主节点
public boolean hasLeadershipByLeaderLatch() {
return leaderLatch.hasLeadership();
}
// 阻塞直到获得领导权
public void awaitByLeaderLatch() {
try {
leaderLatch.await();
} catch (InterruptedException | EOFException e) {
e.printStackTrace();
}
}
// 尝试获得领导权并超时
public boolean awaitByLeaderLatch(long timeout, TimeUnit unit) {
boolean hasLeadership = false;
try {
hasLeadership = leaderLatch.await(timeout, unit);
} catch (InterruptedException e) {
e.printStackTrace();
}
return hasLeadership;
}
2.LeaderSelector
LeaderSelector和LeaderLatch最的差别在于,leader 可以释放领导权以后,还可以继续参与竞争。
当实例被选为leader之后,调用takeLeadership方法进行业务逻辑处理,处理完成即释放领导权。其中 autoRequeue 方法的调用确保此实例在释放领导权后还可能获得领导权。
public class SelectorDemo extends LeaderSelectorListenerAdapter implements Closeable {
private final String name;
private final LeaderSelector leaderSelector;
public SelectorDemo(CuratorFramework client, String name, String path) {
this.name = name;
// 利用一个给定的路径创建一个leader
// 执行leader选举的所有参与者对应路径不许一样
// 本例中 SelectorClient 也是一个LeaderSelectorListener ,但这不是必须的
this.leaderSelector = new LeaderSelector(client, path, this);
// 在大多数情况下,我们会希望一个selector放起leader后还要重新参与选举
leaderSelector.autoRequeue();
}
public void start() {
leaderSelector.start();
}
@Override
public void close() throws IOException {
leaderSelector.close();
}
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println(name + "现在是leader");
System.in.read(); // 阻塞,让当前获得leader权限的节点一直持有,直到该进程关闭
}
public static void main(String[] args) throws IOException {
CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().
connectString("43.105.136.120:2181").
sessionTimeoutMs(5000).
retryPolicy(new ExponentialBackoffRetry(1000, 3)).
build();
curatorFramework.start();
new SelectorDemo(curatorFramework, "ClientB", "/leader").start();
System.in.read();
}
}
本文标题:【Zookeeper】应用场景:实现Leader选举(LeaderLatch与LeaderSelector)
本文链接:https://blog.quwenai.cn/post/9775.html
版权声明:本文不使用任何协议授权,您可以任何形式自由转载或使用。






还没有评论,来说两句吧...