Node Cache
使用节点数据作为本地缓存使用。这个类可以对节点进行监听,能够处理节点的增删改事件,数据同步等。 还可以通过注册自定义监听器来更细节的控制这些数据变动操作。
1. 关键 API
org.apache.curator.framework.recipes.cache.NodeCache
org.apache.curator.framework.recipes.cache.NodeCacheListener
org.apache.curator.framework.recipes.cache.ChildData
2. 机制说明
- 仅仅是单个数据的缓存
- 内部使用状态机作为不同操作的处理控制
3. 用法
3.1 创建
public NodeCache(CuratorFramework client, String path)
3.2 使用
还是一样的套路,在使用前需要调用start()
;用完之后需要调用close()
方法。
随时都可以调用getCurrentData()
获取当前缓存的状态和数据。
也可以通过getListenable()
获取监听器容器,并在此基础上增加自定义监听器:
public void addListener(NodeCacheListener listener)
4. 错误处理
NodeCache实例已经自带一个ConnectionStateListener
处理链接状态的变化。
5. 源码分析
5.1 类定义
public class NodeCache implements Closeable{}
- 实现了
java.io.Closeable
5.2 成员变量
public class NodeCache implements Closeable{ private final Logger log = LoggerFactory.getLogger(getClass()); private final CuratorFramework client; private final String path; private final boolean dataIsCompressed; private final AtomicReferencedata = new AtomicReference (null); private final AtomicReference state = new AtomicReference (State.LATENT); private final ListenerContainer listeners = new ListenerContainer (); private final AtomicBoolean isConnected = new AtomicBoolean(true); private ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) ) { if ( isConnected.compareAndSet(false, true) ) { try { reset(); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error("Trying to reset after reconnection", e); } } } else { isConnected.set(false); } } }; private Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { try { reset(); } catch(Exception e) { ThreadUtils.checkInterrupted(e); handleException(e); } } }; private enum State { LATENT, STARTED, CLOSED } private final BackgroundCallback backgroundCallback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { processBackgroundResult(event); } };}
- log
- client
- path
- 节点路径
- dataIsCompressed
- 数据是否压缩
- data
AtomicReference
- 存放着本地缓存数据
- 缓存数据被封装成
ChildData
- state
AtomicReference
- 状态
- 内部枚举
- LATENT (默认)
- STARTED
- CLOSED
- listeners
org.apache.curator.framework.listen.ListenerContainer
- 监听器容器
- isConnected
- 是否已连接ZK
AtomicBoolean
- connectionStateListener
- 自带的链接状态监听器
- watcher
- 自带的节点监听器
- 一旦节点变动,则调用
reset()
重置
- backgroundCallback
- 节点数据回调操作
- 避免线程阻塞
5.3 构造器
public NodeCache(CuratorFramework client, String path){ this(client, path, false);}public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed){ this.client = client; this.path = PathUtils.validatePath(path); this.dataIsCompressed = dataIsCompressed;}
构造器很简单,就是赋值处理。 所以,大部分逻辑还是在start()
中。
5.4 启动
public void start() throws Exception{ start(false);}public void start(boolean buildInitial) throws Exception{ Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once"); client.getConnectionStateListenable().addListener(connectionStateListener); if ( buildInitial ) { client.checkExists().creatingParentContainersIfNeeded().forPath(path); internalRebuild(); } reset();}private void internalRebuild() throws Exception{ try { Stat stat = new Stat(); byte[] bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(stat).forPath(path) : client.getData().storingStatIn(stat).forPath(path); data.set(new ChildData(path, stat, bytes)); } catch ( KeeperException.NoNodeException e ) { data.set(null); }}private void reset() throws Exception{ if ( (state.get() == State.STARTED) && isConnected.get() ) { client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path); }}
可以看到
- 默认是不进行缓存数据的初始化的
再来看看启动的过程
- 原子操作更新启动状态
- 为链接添加
connectionStateListener
监听器 - 如果需要初始化缓存
- 创建节点
- 调用
internalRebuild()
,初始数据- 同步节点数据与状态
- 写入本地缓存
- 调用
reset()
- 在正常状态时检查缓存节点是否存在
- 第3步可能是不错初始化动作的
- 为节点添加了
watcher
- 回调触发
backgroundCallback
- 调用
processBackgroundResult()
方法(状态机)
- 调用
- 在正常状态时检查缓存节点是否存在
5.4.1 processBackgroundResult方法
启动逻辑很大一部分在processBackgroundResult
方法中。所以,这里再来看看这个方法:
private void processBackgroundResult(CuratorEvent event) throws Exception{ switch ( event.getType() ) { case GET_DATA: { if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { ChildData childData = new ChildData(path, event.getStat(), event.getData()); setNewData(childData); } break; } case EXISTS: { if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() ) { setNewData(null); } else if ( event.getResultCode() == KeeperException.Code.OK.intValue() ) { if ( dataIsCompressed ) { client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path); } else { client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path); } } break; } }}
由于启动在先,所以先来看看EXISTS
事件:
- 如果节点不存在
- 调用
setNewData(null)
,设置数据为null
- 调用
- 如果节点存在
- 如果数据需要压缩处理
- 则解压获取值
- 否则直接获取数据
- 如果数据需要压缩处理
- 好吧,这里又是通过
backgroundCallback
来回调获取值,所以,又会以新的新的状态回到processBackgroundResult
GET_DATA
- 状态机
再来看看GET_DATA
事件:
- 如果读取正常
- 构建
childData
- 调用
setNewData
,赋值
- 构建
5.4.2 setNewData方法
如此看来,还需要看看setNewData
方法:
private void setNewData(ChildData newData) throws InterruptedException{ ChildData previousData = data.getAndSet(newData); if ( !Objects.equal(previousData, newData) ) { listeners.forEach ( new Function() { @Override public Void apply(NodeCacheListener listener) { try { listener.nodeChanged(); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error("Calling listener", e); } return null; } } ); if ( rebuildTestExchanger != null ) { try { rebuildTestExchanger.exchange(new Object()); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); } } }}
- 本地缓存
data
赋新值 - 如果发现缓存有更新
- 触发监听容器中的监听器(同步调用)
rebuildTestExchanger
- 这个可以基本略过
- 测试时向其他现场发送一个信号对象
5.4.3 小结
启动过程大致可以分为
- 添加链接监听器
- 如果需要初始化节点,则创建节点,并拉取缓存数据到本地
- 为节点加上节点监听器,并挂载回调方法
- 通过回调状态机来同步缓存数据
5.5 获取缓存数据
public ChildData getCurrentData(){ return data.get();}
直接读取缓存数据。 对使用者来说,只需要操作本地缓存。 而本地缓存与ZK节点数据,通过监听器回调状态机来完成同步动作。
5.6 关闭
缓存用完,需要调用close()
public void close() throws IOException{ if ( state.compareAndSet(State.STARTED, State.CLOSED) ) { listeners.clear(); client.clearWatcherReferences(watcher); client.getConnectionStateListenable().removeListener(connectionStateListener); // TODO // From PathChildrenCache // This seems to enable even more GC - I'm not sure why yet - it // has something to do with Guava's cache and circular references connectionStateListener = null; watcher = null; } }
- 原子操作更新状态
- 清理监听器容器
- 清理掉节点上监听器
- 清理掉链接上的监听器
并没有制空本地缓存数据
6. 小结
与比起来,Node Cache要简单很多。
- Path Cache
- 更像是一个Cache Manager
- 在path下管理着多个cache
- 由于多个cache的存在
- 同步逻辑复杂
- 并发问题更为严重
- 所以内部使用了
- 命令模式
- 异步按序执行操作
- 所以内部使用了
- Node Cache
- 仅仅是单个数据的缓存
- 而且缓存数据的特性,也无需严格控制并发(脏数据也可以接受)
- 使用一个回调状态机来处理不同的数据状态