博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
[Curator] Node Cache 的使用与分析
阅读量:6273 次
发布时间:2019-06-22

本文共 7966 字,大约阅读时间需要 26 分钟。

  hot3.png

Node Cache

使用节点数据作为本地缓存使用。这个类可以对节点进行监听,能够处理节点的增删改事件,数据同步等。 还可以通过注册自定义监听器来更细节的控制这些数据变动操作。

1. 关键 API

org.apache.curator.framework.recipes.cache.NodeCache

org.apache.curator.framework.recipes.cache.NodeCacheListener

org.apache.curator.framework.recipes.cache.ChildData

2. 机制说明

  • 仅仅是单个数据的缓存
  • 内部使用状态机作为不同操作的处理控制

3. 用法

3.1 创建

public NodeCache(CuratorFramework client,                         String path)

3.2 使用

还是一样的套路,在使用前需要调用start();用完之后需要调用close()方法。

随时都可以调用getCurrentData()获取当前缓存的状态和数据。

也可以通过getListenable()获取监听器容器,并在此基础上增加自定义监听器:

public void addListener(NodeCacheListener listener)

4. 错误处理

NodeCache实例已经自带一个ConnectionStateListener处理链接状态的变化。

5. 源码分析

5.1 类定义

public class NodeCache implements Closeable{}
  • 实现了java.io.Closeable

5.2 成员变量

public class NodeCache implements Closeable{    private final Logger log = LoggerFactory.getLogger(getClass());    private final CuratorFramework client;    private final String path;    private final boolean dataIsCompressed;    private final AtomicReference
data = new AtomicReference
(null); private final AtomicReference
state = new AtomicReference
(State.LATENT); private final ListenerContainer
listeners = new ListenerContainer
(); private final AtomicBoolean isConnected = new AtomicBoolean(true); private ConnectionStateListener connectionStateListener = new ConnectionStateListener() { @Override public void stateChanged(CuratorFramework client, ConnectionState newState) { if ( (newState == ConnectionState.CONNECTED) || (newState == ConnectionState.RECONNECTED) ) { if ( isConnected.compareAndSet(false, true) ) { try { reset(); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error("Trying to reset after reconnection", e); } } } else { isConnected.set(false); } } }; private Watcher watcher = new Watcher() { @Override public void process(WatchedEvent event) { try { reset(); } catch(Exception e) { ThreadUtils.checkInterrupted(e); handleException(e); } } }; private enum State { LATENT, STARTED, CLOSED } private final BackgroundCallback backgroundCallback = new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { processBackgroundResult(event); } };}
  • log
  • client
  • path
    • 节点路径
  • dataIsCompressed
    • 数据是否压缩
  • data
    • AtomicReference
    • 存放着本地缓存数据
    • 缓存数据被封装成ChildData
  • state
    • AtomicReference
    • 状态
    • 内部枚举
      • LATENT (默认)
      • STARTED
      • CLOSED
  • listeners
    • org.apache.curator.framework.listen.ListenerContainer
    • 监听器容器
  • isConnected
    • 是否已连接ZK
    • AtomicBoolean
  • connectionStateListener
    • 自带的链接状态监听器
  • watcher
    • 自带的节点监听器
    • 一旦节点变动,则调用reset()重置
  • backgroundCallback
    • 节点数据回调操作
    • 避免线程阻塞

5.3 构造器

public NodeCache(CuratorFramework client, String path){    this(client, path, false);}public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed){    this.client = client;    this.path = PathUtils.validatePath(path);    this.dataIsCompressed = dataIsCompressed;}

构造器很简单,就是赋值处理。 所以,大部分逻辑还是在start()中。

5.4 启动

public void     start() throws Exception{    start(false);}public void     start(boolean buildInitial) throws Exception{    Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");    client.getConnectionStateListenable().addListener(connectionStateListener);    if ( buildInitial )    {        client.checkExists().creatingParentContainersIfNeeded().forPath(path);        internalRebuild();    }    reset();}private void     internalRebuild() throws Exception{    try    {        Stat    stat = new Stat();        byte[]  bytes = dataIsCompressed ? client.getData().decompressed().storingStatIn(stat).forPath(path) : client.getData().storingStatIn(stat).forPath(path);        data.set(new ChildData(path, stat, bytes));    }    catch ( KeeperException.NoNodeException e )    {        data.set(null);    }}private void     reset() throws Exception{    if ( (state.get() == State.STARTED) && isConnected.get() )    {        client.checkExists().creatingParentContainersIfNeeded().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);    }}

可以看到

  • 默认是不进行缓存数据的初始化的

再来看看启动的过程

  1. 原子操作更新启动状态
  2. 为链接添加connectionStateListener监听器
  3. 如果需要初始化缓存
    1. 创建节点
    2. 调用internalRebuild(),初始数据
      1. 同步节点数据与状态
      2. 写入本地缓存
  4. 调用reset()
    1. 在正常状态时检查缓存节点是否存在
      • 第3步可能是不错初始化动作的
      • 为节点添加了watcher
      • 回调触发backgroundCallback
        • 调用processBackgroundResult()方法(状态机)

5.4.1 processBackgroundResult方法

启动逻辑很大一部分在processBackgroundResult方法中。所以,这里再来看看这个方法:

private void processBackgroundResult(CuratorEvent event) throws Exception{    switch ( event.getType() )    {        case GET_DATA:        {            if ( event.getResultCode() == KeeperException.Code.OK.intValue() )            {                ChildData childData = new ChildData(path, event.getStat(), event.getData());                setNewData(childData);            }            break;        }        case EXISTS:        {            if ( event.getResultCode() == KeeperException.Code.NONODE.intValue() )            {                setNewData(null);            }            else if ( event.getResultCode() == KeeperException.Code.OK.intValue() )            {                if ( dataIsCompressed )                {                    client.getData().decompressed().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);                }                else                {                    client.getData().usingWatcher(watcher).inBackground(backgroundCallback).forPath(path);                }            }            break;        }    }}

由于启动在先,所以先来看看EXISTS事件:

  1. 如果节点不存在
    1. 调用setNewData(null),设置数据为null
  2. 如果节点存在
    1. 如果数据需要压缩处理
      1. 则解压获取值
    2. 否则直接获取数据
  • 好吧,这里又是通过backgroundCallback来回调获取值,所以,又会以新的新的状态回到processBackgroundResult
    • GET_DATA
    • 状态机

再来看看GET_DATA事件:

  1. 如果读取正常
    1. 构建childData
    2. 调用setNewData,赋值

5.4.2 setNewData方法

如此看来,还需要看看setNewData方法:

private void setNewData(ChildData newData) throws InterruptedException{    ChildData   previousData = data.getAndSet(newData);    if ( !Objects.equal(previousData, newData) )    {        listeners.forEach        (            new Function
() { @Override public Void apply(NodeCacheListener listener) { try { listener.nodeChanged(); } catch ( Exception e ) { ThreadUtils.checkInterrupted(e); log.error("Calling listener", e); } return null; } } ); if ( rebuildTestExchanger != null ) { try { rebuildTestExchanger.exchange(new Object()); } catch ( InterruptedException e ) { Thread.currentThread().interrupt(); } } }}
  1. 本地缓存data赋新值
  2. 如果发现缓存有更新
    1. 触发监听容器中的监听器(同步调用)
    2. rebuildTestExchanger
      • 这个可以基本略过
      • 测试时向其他现场发送一个信号对象

5.4.3 小结

启动过程大致可以分为

  1. 添加链接监听器
  2. 如果需要初始化节点,则创建节点,并拉取缓存数据到本地
  3. 为节点加上节点监听器,并挂载回调方法
  4. 通过回调状态机来同步缓存数据

5.5 获取缓存数据

public ChildData getCurrentData(){    return data.get();}

直接读取缓存数据。 对使用者来说,只需要操作本地缓存。 而本地缓存与ZK节点数据,通过监听器回调状态机来完成同步动作。

5.6 关闭

缓存用完,需要调用close()

public void close() throws IOException{    if ( state.compareAndSet(State.STARTED, State.CLOSED) )    {        listeners.clear();        client.clearWatcherReferences(watcher);        client.getConnectionStateListenable().removeListener(connectionStateListener);        // TODO        // From PathChildrenCache        // This seems to enable even more GC - I'm not sure why yet - it        // has something to do with Guava's cache and circular references        connectionStateListener = null;        watcher = null;    }        }
  • 原子操作更新状态
  • 清理监听器容器
  • 清理掉节点上监听器
  • 清理掉链接上的监听器

并没有制空本地缓存数据

6. 小结

与比起来,Node Cache要简单很多。

  • Path Cache
    • 更像是一个Cache Manager
    • 在path下管理着多个cache
    • 由于多个cache的存在
      • 同步逻辑复杂
      • 并发问题更为严重
        • 所以内部使用了
          • 命令模式
          • 异步按序执行操作
  • Node Cache
    • 仅仅是单个数据的缓存
    • 而且缓存数据的特性,也无需严格控制并发(脏数据也可以接受)
    • 使用一个回调状态机来处理不同的数据状态

转载于:https://my.oschina.net/roccn/blog/918458

你可能感兴趣的文章
以太坊ERC20代币合约优化版
查看>>
Why I Began
查看>>
同一台电脑上Windows 7和Ubuntu 14.04的CPU温度和GPU温度对比
查看>>
js数组的操作
查看>>
springmvc Could not write content: No serializer
查看>>
Python系语言发展综述
查看>>
新手 开博
查看>>
借助开源工具高效完成Java应用的运行分析
查看>>
163 yum
查看>>
第三章:Shiro的配置——深入浅出学Shiro细粒度权限开发框架
查看>>
80后创业的经验谈(转,朴实但实用!推荐)
查看>>
让Windows图片查看器和windows资源管理器显示WebP格式
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
vim使用点滴
查看>>
embedded linux学习中几个需要明确的概念
查看>>
mysql常用语法
查看>>
Morris ajax
查看>>
【Docker学习笔记(四)】通过Nginx镜像快速搭建静态网站
查看>>
ORA-12514: TNS: 监听程序当前无法识别连接描述符中请求的服务
查看>>