在zookeeper中,watcher机制来通知客户端其服务端的节点信息发生的变化。原理图如下:
多个分布式进程通过ZooKeeper提供的API来操作共享的ZooKeeper内存数据对象ZNode来达成某种一致的行为或结果,这种模式本质上是基于状态共享的并发模型,与Java的多线程并发模型一致,他们的线程或进程都是”共享式内存通信“。
Watch机制是异步非阻塞的主动通知模式即,使得分布式进程之间的“共享状态通信”更加实时高效;
- ZooKeeper的Watch架构
Watch的整体流程如下图所示,Client先向ZooKeeper服务端成功注册想要监听的节点状态,同时Client端本地会存储该监听器相关的信息在WatchManager中;
当ZooKeeper服务端监听的数据状态发生变化时,ZooKeeper就会主动通知发送相应事件信息给相关Client端,Client端线程从watchermanager中取出对应的watcher对象,根据通知类型和节点路径,来处理回调逻辑,对客户端数据做出相应处理。
zookeeper使用watchedevent对象来封装服务端事件并传递给watcher
watchedevent结构,其包括三个基本属性:
- 通知状态(keepstate):表示当事件发生时ZooKeeper的状态
- 事件类型(eventType):主要是ZNode子节点的变化
- 节点路径(path):znode路径
具体的描述见下表:
KeeperState | EventType | TriggerCondition | EnableCalls | 说明 |
SyncConnected (3)
|
None
(-1) |
客户端与服务器成功建立会话 | 此时客户端与服务器处于连接状态 | |
同上 | NodeCreated
(1) |
Watcher监听的对应数据节点被创建 | Exists | 同上 |
同上 | NodeDeleted
(2) |
Watcher监听的对应数据节点被删除 | Exists, GetData, and GetChildren | 同上 |
同上 | NodeDataChanged
(3) |
Watcher监听的数据节点的数据内容和数据版本号发生变化 | Exists and GetData | 同上 |
同上 | NodeChildrenChanged
(4) |
Watcher监听的数据节点的子节点列表发生变化,子节点内容变化不会触发 | GetChildren | 同上 |
Disconnected
(0) |
None
(-1) |
客户端与ZooKeeper服务器断开连接 | 此时客户端与服务器处于断开连接的状态 | |
Expried
(-112) |
None
(-1) |
会话超时 | 此时客户端会话失效,通常同时也会收到SessionExpiredException异常 | |
AuthFailed
(4) |
None
(-1) |
通常有两种情况:
1.使用错误的scheme进行权限检查 2.SASL权限检查失败 |
收到AuthFailedException异常 |
各角色对Watch的处理流程图:
-
如何注册Watch事件?
ZOO_CREATED_EVENT:节点创建事件,需要watch一个不存在的节点,当节点被创建时触发,此watch通过zoo_exists()设置
ZOO_DELETED_EVENT:节点删除事件,此watch通过zoo_exists()或zoo_get()设置
ZOO_CHANGED_EVENT:节点数据改变事件,此watch通过zoo_exists()或zoo_get()设置
ZOO_CHILD_EVENT:子节点列表改变事件,此watch通过zoo_get_children()或zoo_get_children2()设置
ZOO_SESSION_EVENT:会话失效事件,客户端与服务端断开或重连时触发
ZOO_NOTWATCHING_EVENT:watch移除事件,服务端出于某些原因不再为客户端watch节点时触发
一个Watcher的样例&循环注册样例(java):
class InnerZK implements Watcher {
public void process(WatchedEvent event) {
// 如果是“数据变更”事件
if (event.getType() != EventType.None) {
switch (event.getType()) {
// 如果其父节点(/serverType)被创建,
// 此时configServer也开始注册其子节点信息,watcher在下文中SyncConnected中注册。
case NodeCreated:
register();
break;
case NodeDeleted:
// 如果父节点被删除,那么此后子节点也将不复存在
path = null;
register();// 重新注册watch,检测父节点/serverType再次创建。
break;
default:
break;
}
return;
}
// 如果是链接状态迁移
// 参见keeperState
switch (event.getState()) {
case SyncConnected:
System.out.println("Connected...");
// 如果path == null,则表明是首次链接或者session重建。
if (path == null) {
try {
register();// 创建子节点,并对其父节点注册watch。
outdate = false;
} catch (Exception e) {
e.printStackTrace();
}
}
break;
case Expired:
System.out.println("Expired...");
outdate = true;
init = true;
synchronized (tag) {
tag.notifyAll();
}
break;
// session过期
case Disconnected:
// 链接断开,或session迁移
System.out.println("Connecting....");
break;
case AuthFailed:
init = true;
synchronized (tag) {
tag.notifyAll();
}
if (autoReconnected && thread.isAlive()) {
thread.interrupt();
}
throw new RuntimeException("ZK Connection auth failed...");
default:
break;
}
}
}
private boolean register() {
lock.lock();
init = false;
try {
Stat stat = zkClient.exists("/" + serverType, true);// 注册“父节点”watch,跟踪父节点的创建/删除
// 创建跟节点:/cache-server
// 如果跟节点不存在,则等待configManager去创建,创建成功后,将会在下文的watch事件中创建此子节点。
if (stat == null) {
return false;
}
init = true;
synchronized (tag) {
tag.notifyAll();
}
} catch (NodeExistsException ne) {
// ignore.
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return true;
}
- 需要注意:
(1)事件注册是一次性的,因为在每次处理事件之后,就会将相应的watcher注册删除;
(2)客户端接收到的服务端watcher事件中并不包含事件更改的具体内容,只是告知发生了这样一个watcher事件,所以,客户端在接收到watcher事件之后,需要在回调函数process中对服务端的数据进行重新获取,才能获得更改的具体内容。
没有评论