上篇文章介绍了Curator中的分布式计数器及其原理,本篇文章会对Curator下的监听机制。Zookeeper原生支持对节点事件进行监听,Curator也封装了原生的操作,下来先来看下基于
org.apache.zookeeper.Watcher的原生监听方式 原生监听-usingWatcher
先看下org.apache.curator.framework.CuratorFramework#getData方法的返回值GetDataBuilder的构造
public interface GetDataBuilder extends
Watchable<BackgroundPathable<byte[]>>,//事件监听
BackgroundPathable<byte[]>,//异步操作
Statable<WatchPathable<byte[]>>,//节点属性存储相关
Decompressible<GetDataWatchBackgroundStatable>//数据压缩
{
}
其中org.apache.curator.framework.api.Watchable接口中方法如下
public interface Watchable<T>
{
public T watched();
//设置监听器
public T usingWatcher(Watcher watcher);
public T usingWatcher(CuratorWatcher watcher);
}
我们可以使用usingWatcher()方法来新增一个节点的监听器
具体使用案例如下
public class Watcher {
public static void main(String[] args) throws Exception {
CuratorFramework zkClient = getZkClient();
String path = "/watchNode";
byte[] initData = "initData".getBytes();
//先创建一个用于事件监听的测试节点
zkClient.create().forPath(path, initData);
//设置监听器
zkClient.getData().usingWatcher(new org.apache.zookeeper.Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("监听到节点事件:" + JSON.toJSONString(watchedEvent));
}
}).forPath(path);
//第一次更新
zkClient.setData().forPath(path, "1".getBytes());
//第二次更新
zkClient.setData().forPath(path, "2".getBytes());
//Sleep等待监听事件触发
Thread.sleep(Integer.MAX_VALUE);
}
private static CuratorFramework getZkClient() {
String zkServerAddress = "127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString(zkServerAddress)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
zkClient.start();
return zkClient;
}
}
在命令行出输出结果如下
可以看到,代码中进行了两次修改,监听事件却只触发了一次,类型为NodeDataChanged,这也是原生监听事件的不足,即原生Watch事件只能触发一次
Curator-Cache
为了免去开发人员重复注册Watcher的麻烦,org.apache.curator.framework.recipes.cache下提供了对监听监听事件的高级封装,主要类有以下三个
| 类名 | 用途 |
|---|---|
| NodeCache | 监听节点对应增、删、改操作 |
| PathChildrenCache | 监听节点下一级子节点的增、删、改操作 |
| TreeCache | 可以将指定的路径节点作为根节点,对其所有的子节点操作进行监听,呈现树形目录的监听 |
下面分别介绍用法,NodeCache由于其实现较为简单,也会分析下原理
NodeCache
先看下用法
public class Watcher {
public static void main(String[] args) throws Exception {
CuratorFramework zkClient = getZkClient();
String path = "/nodeCache";
byte[] initData = "initData".getBytes();
//创建节点用于测试
zkClient.create().forPath(path, initData);
NodeCache nodeCache = new NodeCache(zkClient, path);
//调用start方法开始监听
nodeCache.start();
//添加NodeCacheListener监听器
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("监听到事件变化,当前数据:"+new String(nodeCache.getCurrentData().getData()));
}
});
//第一次更新
zkClient.setData().forPath(path, "first update".getBytes());
Thread.sleep(1000);
//第二次更新
zkClient.setData().forPath(path, "second update".getBytes());
Thread.sleep(1000);
//第三次更新
zkClient.setData().forPath(path, "third update".getBytes());
Thread.sleep(Integer.MAX_VALUE);
}
private static CuratorFramework getZkClient() {
String zkServerAddress = "127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString(zkServerAddress)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
zkClient.start();
return zkClient;
}
}
命令行输出如下,可以看到,事件监听机制没有失效,可以重复触发
监听到事件变化,当前数据:first update
监听到事件变化,当前数据:second update
监听到事件变化,当前数据:third update
下面再来分析下NodeCache实现原理,先画出重点,核心类如下
org.apache.curator.framework.recipes.cache.NodeCache
//nodeCache对应监听器
org.apache.curator.framework.recipes.cache.NodeCacheListener
//nodeCache内部节点信息存储对象
org.apache.curator.framework.recipes.cache.ChildData
先从NodeCache入手,看下内部有哪些属性
public class NodeCache implements Closeable
{
//zk客户端
private final CuratorFramework client;
//监听路径
private final String path;
//是否开启数据压缩
private final boolean dataIsCompressed;
//本地节点信息(监听的zk节点的备份) 使用AtomicReference来确保更新的原子性
private final AtomicReference<ChildData> data = new AtomicReference<ChildData>(null);
//Nodecache状态 使用AtomicReference来确保更新的原子性
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
//监听器
private final ListenerContainer<NodeCacheListener> listeners = new ListenerContainer<NodeCacheListener>();
//是否连接
private final AtomicBoolean isConnected = new AtomicBoolean(true);
//zk客户端监听器 监听连接事件
private ConnectionStateListener connectionStateListener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) )
{
if ( isConnected.compareAndSet(false, true) )
{
try
{
//监听到连接事件后,重新为节点设置监听事件(usingWatcher)
reset();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
log.error("Trying to reset after reconnection", e);
}
}
}
else
{
isConnected.set(false);
}
}
};
//usingWatcher 时使用的watcher
private Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
try
{
reset();
}
catch(Exception e)
{
ThreadUtils.checkInterrupted(e);
handleException(e);
}
}
};
//nodeCache状态
private enum State
{
//初始状态
LATENT,
//调用start()方法变为启动
STARTED,
//调用close()方法变为关闭
CLOSED
}
//异步后台处理事件,触发后重新设置监听事件
private final BackgroundCallback backgroundCallback = new BackgroundCallback()
{
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
processBackgroundResult(event);
}
};
}
可以看到几个关键属性如下
AtomicReference<ChildData> data将zk节点信息在本地做备份存储Watcher watcher监听器,每次触发时重新设置用到的监听器
再从流程入口start()方法开始分析其原理
public void start(boolean buildInitial) throws Exception
{
//因为state被声明为AtomicReference类型,利用其原子特性,保证nodeCache实例只能被启动一次
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
//为zkClient设置链接监听器,监听器监听到连接事件后会触发reset方法
client.getConnectionStateListenable().addListener(connectionStateListener);
//如果设置buildInitial为true,会在启动时在internalRebuild()方法中将zk节点信息缓存到本地
if ( buildInitial )
{
client.checkExists().creatingParentContainersIfNeeded().forPath(path);
internalRebuild();
}
//设置usingWatcher监听事件
reset();
}
先看下internalRebuild()初始化本地节点信息方法
private void internalRebuild() throws Exception
{
try
{
Stat stat = new Stat();
//如果开启了数据压缩,先解压,否则直接读取
byte[] bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(stat).forPath(path) : client.getData().storingStatIn(stat).forPath(path);
//将节点封装成本地ChildData并存储
data.set(new ChildData(path, stat, bytes));
}
catch ( KeeperException.NoNodeException e )
{
data.set(null);
}
}
好多地方都看到了reset(),看下源码
private void reset() throws Exception
{
//如果nodeCache状态为启动,并且zk连接状态为true
if ( (state.get() == State.STARTED) && isConnected.get() )
{
//给节点设置watcher事件,同时利用zk异步特性,在backgroundCallback中处理
client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
}
}
异步处理回调backgroundCallback中processBackgroundResult方法节点变化后的回调事件,使用异步的主要原因是为了防止线程阻塞
代码如下
private void processBackgroundResult(CuratorEvent event) throws Exception
{
switch ( event.getType() )
{
//如果是getData()事件,并且返回成功
case GET_DATA:
{
if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
ChildData childData = new ChildData(path, event.getStat(), event.getData());
//缓存事件 并触发NodeCacheListener中nodeChanged事件
setNewData(childData);
}
break;
}
//EXISTS对应checkExists()操作
case EXISTS:
{
//如果节点不存在 设置空
if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )
{
setNewData(null);
}
//节点存在 且操作成功
else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
{
if ( dataIsCompressed )
{
//如果开了数据压缩,先解压,再设置监听器和异步回调
client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
}
else
{
//设置监听器,并设置异步回调
client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);
}
}
break;
}
}
}
更新节点数据方法setNewData方法如下
private void setNewData(ChildData newData) throws InterruptedException
{
//原子更新本地缓存中节点数据
ChildData previousData = data.getAndSet(newData);
if ( !Objects.equal(previousData, newData) )
{
//循环触发每个监听器的nodeChanged事件
listeners.forEach
(
new Function<NodeCacheListener, Void>()
{
@Override
public Void apply(NodeCacheListener listener)
{
try
{
listener.nodeChanged();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
log.error("Calling listener", e);
}
return null;
}
}
);
//可以忽略 测试用
if ( rebuildTestExchanger != null )
{
try
{
rebuildTestExchanger.exchange(new Object());
}
catch ( InterruptedException e )
{
Thread.currentThread().interrupt();
}
}
}
}
可以看到NodeCache解决重复注册的关键点就在于processBackgroundResult()方法,其作用类似于一个状态机(递归),能够不断重复设置监听器,设置异步回调来更新NodeCache中节点数据并触发监听器nodeChanged()事件
下面整理下整体流程
- 调用start()方法
- 为zk客户端设置连接监听器
- 若设置了初始化节点值参数为true,则读取zk节点信息并缓存到本地
- 调用reset方法并使用usingWatcher()方法给节点设置监听器,同时设置异步回调方法backgroundCallback(),来不断触发本地节点缓存的更新以及重新设置节点监听器
PathChildrenCache
用法基本同nodeCache
public class Watcher {
public static void main(String[] args) throws Exception {
CuratorFramework zkClient = getZkClient();
String path = "/pathChildrenCache";
byte[] initData = "initData".getBytes();
//创建节点用于测试
zkClient.create().forPath(path, initData);
PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, path, true);
//调用start方法开始监听 ,设置启动模式为同步加载节点数据
pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
//添加监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
System.out.println("节点数据变化,类型:" + event.getType() + ",路径:" + event.getData().getPath());
}
});
String childNodePath = path + "/child";
//创建子节点
zkClient.create().forPath(childNodePath, "111".getBytes());
Thread.sleep(1000);
//更新子节点
zkClient.setData().forPath(childNodePath, "222".getBytes());
Thread.sleep(1000);
//删除子节点
zkClient.delete().forPath(childNodePath);
Thread.sleep(Integer.MAX_VALUE);
}
private static CuratorFramework getZkClient() {
String zkServerAddress = "127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString(zkServerAddress)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
zkClient.start();
return zkClient;
}
}
输出如下,可以看到监听到了子节点的增(CHILD_ADD)、删(CHILD_REMOVED)、改(CHILD_UPDATED)事件
使用PathChildrenCache需注意以下两点
- 无法对监听路径所在节点进行监听(即不能监听path对应节点的变化)
- 只能监听path对应节点下一级目录的子节点的变化内容(即只能监听/path/node1的变化,而不能监听/path/node1/node2 的变化)
- PathChildrenCache在调用start()方法时,有3种启动模式,分别为
NORMAL-初始化缓存数据为空
BUILD_INITIAL_CACHE-在start方法返回前,初始化获取每个子节点数据并缓存
POST_INITIALIZED_EVENT-在后台异步初始化数据完成后,会发送一个INITIALIZED初始化完成事件
TreeCache
用法如下
public class Watcher {
public static void main(String[] args) throws Exception {
CuratorFramework zkClient = getZkClient();
String path = "/treeCache";
byte[] initData = "initData".getBytes();
//创建节点用于测试
zkClient.create().forPath(path, initData);
TreeCache treeCache = new TreeCache(zkClient, path);
//调用start方法开始监听
treeCache.start();
//添加TreeCacheListener监听器
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("监听到节点数据变化,类型:"+event.getType()+",路径:"+event.getData().getPath());
}
});
Thread.sleep(1000);
//更新父节点数据
zkClient.setData().forPath(path, "222".getBytes());
Thread.sleep(1000);
String childNodePath = path + "/child";
//创建子节点
zkClient.create().forPath(childNodePath, "111".getBytes());
Thread.sleep(1000);
//更新子节点
zkClient.setData().forPath(childNodePath, "222".getBytes());
Thread.sleep(1000);
//删除子节点
zkClient.delete().forPath(childNodePath);
Thread.sleep(Integer.MAX_VALUE);
}
private static CuratorFramework getZkClient() {
String zkServerAddress = "127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184";
ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3, 5000);
CuratorFramework zkClient = CuratorFrameworkFactory.builder()
.connectString(zkServerAddress)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
zkClient.start();
return zkClient;
}
}

图一里可以看到 监听到了父节点的数据变化,但是有个空指针异常,这是因为TreeNode不像NodeCache和PathChidrenCache在start()时能传入参数来决定是否初始化本地缓存中的节点数据,TreeNode在start()后没有初始化好本地节点缓存,这个时候在监听器内调用event.getData()方法时实际取的是本地缓存中的节点数据,所以会报错。所以正确的写法应该在监听器的处理方法中根据event事件类型来做不同处理,不能无脑调用getData()方法。
图二里可以看到子节点信息变化也监听到了







还没有评论,来说两句吧...