ZooKeeper进行领导者选举是比较容易的。
伪代码表示:
zkclient:
<1>判定是否存在/zxeample/leader路径
<2>如果不存在,那么试图创建一个会话znode(Ephemeral Path)(path = /zxeample/leader,data=client id)
<2.1>创建成功,标识自己是leader
<2.2>创建不成功(包括异常)转向<1>
<3>如果存在path=/zxeample/leader,标识自己是slave,(可能需要与leader进行通信)
<4>如果自己是slave,那么监控该znode的data change事件。(用于当leader挂了,事件通知模型,就会产生事件触发通知,从而进行新的选举领导者)
基于java开源org.I0Itec.zkclient库实现,更简单。kafka也是基于这个实现leader选举的,不过是scala写的。
测试方法:
(1)启动ZooKeeper server
(2)启动zkCli
(3)启动程序,
构建10个线程,每个线程都是一个ZkClient,
(4)然后在zkCli中,使用命令rmr /zxexample/leader
总结:尚有2个不如人意之处.创建znode有冲突,因为存在多个client同时创建,单只有一个成功,其余失败(逻辑正确),但是会打印很多异常。第二,线程是用sleep,因此,其实是一直在循环,即轮询,而没有消息驱动的方式。
package zkexam; import java.security.SecureRandom; import java.util.concurrent.Callable; import org.I0Itec.zkclient.IZkDataListener; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; /** * choose a server as a Leader(Master),while other servers are slaves. * * @author Free * */ public class ServerElect { SecureRandom rand = new SecureRandom(); public ServerElect() { } public static class Leader { ZkClient leader; // byte[] data; public ZkClient getClient() { return leader; } public void setClient(ZkClient leaderClient) { this.leader = leaderClient; } } Leader selectLeader(ZkClient... client) { if (client == null || client.length < 0) { throw new IllegalArgumentException( "no zookeeper client need to be selected as leader."); } Leader leader = new Leader(); do { int i = rand.nextInt() % (client.length); try { client[i].createEphemeral("/zxexample/leader", "I am leader " + i); leader.setClient(client[i]); for (int j = 0; j < client.length && j != i; j++) { } break; } catch (Exception e) { e.printStackTrace(); } } while (true); return leader; } public class MyWatcher<T> implements Watcher { Callable<T> callback; MyWatcher(Callable<T> c) { callback = c; } @Override public void process(WatchedEvent event) { org.apache.zookeeper.Watcher.Event.EventType eventType = event .getType(); switch (eventType) { case NodeDeleted: try { callback.call(); } catch (Exception e) { e.printStackTrace(); } break; default: break; } } } public static class LeaderChangeListener implements IZkDataListener { ZkClient client; public LeaderChangeListener(ZkClient client_) { client = client_; } /** * Called when the leader information stored in zookeeper has changed. Record the new leader in memory * * @throws Exception * On any error. */ public void handleDataChange(String dataPath, Object data) { System.out.println("a new leader is elected."); } @Override public void handleDataDeleted(String dataPath) throws Exception { System.out.println(dataPath + ":data is deleted."); } } public static class zkClientThread extends Thread { final static String path = "/zxexample/leader"; ZkClient client; long maxMsToWaitUntilConnected; volatile boolean isFirstTime = true; volatile boolean isLeader; String data; // Watcher watcher; public zkClientThread(ZkClient client_, String name) { super(name); client = client_; } public void start() { super.start(); } public void tryLeader() { try { data = getName(); if (!client.exists(path)) { try { client.createEphemeral(path, data); } catch (ZkNoNodeException e) { String parentDir = path.substring(0, path.lastIndexOf('/')); if (parentDir.length() != 0) { client.createPersistent(parentDir, true); } client.createEphemeral(path, data); } isLeader = true; System.out.println("I am leader :" + getName()); } } catch (Exception e) { e.printStackTrace(); isFirstTime = true; isLeader = false; } } public void run() { while (true) { if (client.exists(path)) { if (isFirstTime) { Object obj = client.readData(path); if (obj == null || !obj.toString().equals(getName())) { tryLeader(); } else { // client.subscribeDataChanges(path, // new LeaderChangeListener(client)); // wait leader ,and communication to leader; client.watchForData(path); } isFirstTime = false; } } else { tryLeader(); } try { Thread.sleep(1000); } catch (InterruptedException e) { break; } } } } public static void main(String args[]) { int curClientCount = 10; ZkClient[] client = new ZkClient[curClientCount]; zkClientThread[] zkThreads = new zkClientThread[curClientCount]; for (int i = 0; i < curClientCount; i++) { client[i] = new ZkClient("127.0.0.1:2181", 218100); zkThreads[i] = new zkClientThread(client[i], "zk-" + i); } for (int i = 0; i < zkThreads.length; i++) { zkThreads[i].start(); } } }
I am leader :zk-6 I am leader :zk-5 I am leader :zk-6 org.I0Itec.zkclient.exception.ZkNodeExistsException: org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /zxexample/leader at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:55) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) at org.I0Itec.zkclient.ZkClient.create(ZkClient.java:304) at org.I0Itec.zkclient.ZkClient.createEphemeral(ZkClient.java:328) at zkexam.ServerElect$zkClientThread.tryLeader(ServerElect.java:141) at zkexam.ServerElect$zkClientThread.run(ServerElect.java:169) Caused by: org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /zxexample/leader at org.apache.zookeeper.KeeperException.create(KeeperException.java:119) at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) at org.apache.zookeeper.ZooKeeper.create(ZooKeeper.java:783) at org.I0Itec.zkclient.ZkConnection.create(ZkConnection.java:87) at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:308) at org.I0Itec.zkclient.ZkClient$1.call(ZkClient.java:304) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) ... 4 more I am leader :zk-3
相关推荐
在分布式环境中实现Leader选举、互斥锁和读写锁通常涉及到协调服务,如etcd、Zookeeper或Consul。这些服务提供了必要的原语来处理节点间的协调和数据一致性。以下是实现这些功能的一般步骤: Leader选举: 使用协调...
fast paxos算法与zookeeper leader选举源代码分析.doc
本文详细分析了Zookeeper的源码,特别是Leader选举过程的实现。首先,介绍了阅读源码的意义,包括技术提升、框架掌握、问题定位、面试准备、深入理解技术以及参与开源社区。接着,提供了一系列高效阅读源码的方法,...
面试官:说一说Zookeeper中Leader选举机制.doc
基于Wavelet_leader_省略_映射算法的回转支承自适应特征提取
Redis-leader-by-lock 使用Redis Lock轻松实现集群领导者选举仅使用Spring-Boot和Redis动机几乎所有使用Spring Boot进行领导力选举的例子都转到Hazelcast(现在已弃用)和Zookeeper(在简单情况下过强)中的Spring ...
zkclient 项目项目介绍:zkclient 是对zookeeper java客户端进行的封装,主要实现了连接、断线重连,watch事件改为listen监听事件,分布式锁等注意: 使用时需要自行编译安装到maven或打成jar使用使用方式:...
Leader 选举分为 Zookeeper 集群初始化启动时选举和 Zookeeper 集群运行期间 Leader 重新选举两种情况。 在讲解 Leader 选举前先了解一下 Zookeeper 节点 4 种可能状态和事务ID概念。 本文是在zookeeper的部署与验证...
Redis 领导者Redis 支持的领导人选举要求Redis 2.6.12安装 npm install redis-leader例子 var Leader = require ( 'redis-leader' ) ;应用程序接口新领导者(redis,选项) 创建一个新的领导者redis是标识锁的字符串...
在第一节课,我们讲到了 zookeeper 的来源,是来自于 google chubby。为了解决在分布式环境下,如何从多个 server 中选举出 maste
etcd-leader 正在开发中,尚不适合生产使用。 领导者选举模块建立在强大的选举算法上,并经过彻底的测试。 用法 提供一个配置客户端。 请注意,此包不依赖于 node-etcd。 它与^4.0.0版本的 node-etcd 兼容。 var...
索布扎来自的稳定领导人选举算法的实现图 4 算法与图 5 优化。
12.2.5 zk是如何选举Leader的?
在战场环境中,战术分队的队形在面对复杂静态或动态障碍物时难以较好地保持,针对此问题,提出了基于Leader-Follower算法的改进队形控制方法。在leader寻径阶段,通过在战场导航网格中应用两阶段路径搜索方法,先...
Leader 选举在分布式场景中的应用demo代码
用MATLAB编写的leader_follower算法实现聚类的函数.希望能对大家有帮助....zip
Leader-follower 算法的matlab算法实现
zookeeper选举机制图,内讲述了zookeeper是如何选举出leader、fllower的
##Leader 选举使用 Raft 共识协议。 ###介绍 Raft 是一种共识算法,用于在分布式环境中选举领导者并管理复制日志。 它产生的结果等价于(multi-)Paxos,和Paxos一样高效,但它的结构与Paxos不同; 这使得 Raft 比...