zookeeper 如何实现分布式状态通知?

2018年10月14日

在zookeeper中,watcher机制来通知客户端其服务端的节点信息发生的变化。原理图如下:

多个分布式进程通过ZooKeeper提供的API来操作共享的ZooKeeper内存数据对象ZNode来达成某种一致的行为或结果,这种模式本质上是基于状态共享的并发模型,与Java的多线程并发模型一致,他们的线程或进程都是”共享式内存通信“。

Watch机制是异步非阻塞的主动通知模式即,使得分布式进程之间的“共享状态通信”更加实时高效;

  • ZooKeeper的Watch架构

Watch的整体流程如下图所示,Client先向ZooKeeper服务端成功注册想要监听的节点状态,同时Client端本地会存储该监听器相关的信息在WatchManager中;

当ZooKeeper服务端监听的数据状态发生变化时,ZooKeeper就会主动通知发送相应事件信息给相关Client端,Client端线程从watchermanager中取出对应的watcher对象,根据通知类型和节点路径,来处理回调逻辑,对客户端数据做出相应处理。

zookeeper使用watchedevent对象来封装服务端事件并传递给watcher

watchedevent结构,其包括三个基本属性:

  • 通知状态(keepstate):表示当事件发生时ZooKeeper的状态
  • 事件类型(eventType):主要是ZNode子节点的变化
  • 节点路径(path):znode路径

具体的描述见下表:

KeeperState EventType TriggerCondition EnableCalls 说明
 

 

SyncConnected

(3)

 

 

None

(-1)

客户端与服务器成功建立会话 此时客户端与服务器处于连接状态
同上 NodeCreated

(1)

Watcher监听的对应数据节点被创建 Exists 同上
同上 NodeDeleted

(2)

Watcher监听的对应数据节点被删除 Exists, GetData, and GetChildren 同上
同上 NodeDataChanged

(3)

Watcher监听的数据节点的数据内容和数据版本号发生变化 Exists and GetData 同上
同上 NodeChildrenChanged

(4)

Watcher监听的数据节点的子节点列表发生变化,子节点内容变化不会触发 GetChildren 同上
Disconnected

(0)

None

(-1)

客户端与ZooKeeper服务器断开连接 此时客户端与服务器处于断开连接的状态
Expried

(-112)

None

(-1)

会话超时 此时客户端会话失效,通常同时也会收到SessionExpiredException异常
AuthFailed

(4)

None

(-1)

通常有两种情况:

1.使用错误的scheme进行权限检查

2.SASL权限检查失败

收到AuthFailedException异常

各角色对Watch的处理流程图:

 

  • 如何注册Watch事件?

ZOO_CREATED_EVENT:节点创建事件,需要watch一个不存在的节点,当节点被创建时触发,此watch通过zoo_exists()设置
ZOO_DELETED_EVENT:节点删除事件,此watch通过zoo_exists()或zoo_get()设置
ZOO_CHANGED_EVENT:节点数据改变事件,此watch通过zoo_exists()或zoo_get()设置
ZOO_CHILD_EVENT:子节点列表改变事件,此watch通过zoo_get_children()或zoo_get_children2()设置
ZOO_SESSION_EVENT:会话失效事件,客户端与服务端断开或重连时触发
ZOO_NOTWATCHING_EVENT:watch移除事件,服务端出于某些原因不再为客户端watch节点时触发

 

一个Watcher的样例&循环注册样例(java):

    class InnerZK implements Watcher {

        public void process(WatchedEvent event) {
            // 如果是“数据变更”事件
            if (event.getType() != EventType.None) {
                switch (event.getType()) {
                    // 如果其父节点(/serverType)被创建,
                    // 此时configServer也开始注册其子节点信息,watcher在下文中SyncConnected中注册。
                    case NodeCreated:
                        register();
                        break;
                    case NodeDeleted:
                        // 如果父节点被删除,那么此后子节点也将不复存在
                        path = null;
                        register();// 重新注册watch,检测父节点/serverType再次创建。
                        break;
                    default:
                        break;
                }
                return;
            }
            // 如果是链接状态迁移
            // 参见keeperState
            switch (event.getState()) {
                case SyncConnected:
                    System.out.println("Connected...");
                    // 如果path == null,则表明是首次链接或者session重建。
                    if (path == null) {
                        try {
                            register();// 创建子节点,并对其父节点注册watch。
                            outdate = false;
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                    break;
                case Expired:
                    System.out.println("Expired...");
                    outdate = true;
                    init = true;
                    synchronized (tag) {
                        tag.notifyAll();
                    }
                    break;
                // session过期
                case Disconnected:
                    // 链接断开,或session迁移
                    System.out.println("Connecting....");
                    break;
                case AuthFailed:
                    init = true;
                    synchronized (tag) {
                        tag.notifyAll();
                    }
                    if (autoReconnected && thread.isAlive()) {
                        thread.interrupt();
                    }
                    throw new RuntimeException("ZK Connection auth failed...");
                default:
                    break;
            }
        }
    }



private boolean register() {
    lock.lock();
    init = false;
    try {
        Stat stat = zkClient.exists("/" + serverType, true);// 注册“父节点”watch,跟踪父节点的创建/删除
        // 创建跟节点:/cache-server
        // 如果跟节点不存在,则等待configManager去创建,创建成功后,将会在下文的watch事件中创建此子节点。
        if (stat == null) {
            return false;
        }
        
        init = true;
        synchronized (tag) {
            tag.notifyAll();
        }
    } catch (NodeExistsException ne) {
        // ignore.
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        lock.unlock();
    }
    return true;
}
  • 需要注意:

(1)事件注册是一次性的,因为在每次处理事件之后,就会将相应的watcher注册删除;

(2)客户端接收到的服务端watcher事件中并不包含事件更改的具体内容,只是告知发生了这样一个watcher事件,所以,客户端在接收到watcher事件之后,需要在回调函数process中对服务端的数据进行重新获取,才能获得更改的具体内容。

没有评论

发表评论

电子邮件地址不会被公开。 必填项已用*标注