zk 分布式同步功能,可以由3个角色组成: zk config manager(消息生产) , zk config server(消息生产2), zk config client(消息消费)
zk config manager: 任务部署在web server端,主要功能:
- 负责创建管理所有的“serverType” ,即一级节点,用于区分不同的配置内容, 可由 管理员通过前台界面配置一共有几个一级节点;比如一个“serverType” 用于存储集群中多个worker的相关信息, 可以给worker集群设置一个一级节点为”workers”
- 检测并保持与ZK 之间session连接状态;
zk config server: 任务部署在Worker集群中的机器上, 主要功能:
- 每个serverType都有多个子节点,子节点由configServer实例负责注册。比如worker集群中每个worker上的节点就是”worker的IP地址”, 作为”workers”一级节点下的二级节点:
- 对关注的datanote 的watch 做循环注册: 即在watcher监听事件后, 补充再次注册一下该节点的watcher
class InnerZK implements Watcher { public void process(WatchedEvent event) { // 如果是“数据变更”事件 if (event.getType() != EventType.None) { System.out.println("ConfigServer watch------path:" + event.getPath() + ";state" + event.getState() + ";type:" + event.getType()); switch (event.getType()) { case NodeCreated: register(); break; case NodeDeleted: // 如果父节点被删除,那么此后子节点也将不复存在 path = null; register();// 注册watch,检测父节点/serverType再次创建。 break; default: break; } return; } // 如果是链接状态迁移 // 参见keeperState switch (event.getState()) { ... } } }
private boolean register() {
lock.lock();
init = false;
try {
Stat stat = zkClient.exists("/" + serverType, true);// 注册“/serverType”的watch,跟踪节点的创建/删除
// 创建跟节点:/cache-server
// 如果跟节点不存在,则等待configManager去创建,创建成功后,将会在下文的watch事件中创建此子节点。
if (stat == null) {
return false;
}
init = true;
synchronized (tag) {
tag.notifyAll();
}
} catch (NodeExistsException ne) {
// ignore.
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return true;
}
- 检测并保持与zk 之间的session连接状态;
zk config client: 任务部署在Worker上, 主要功能:
- 监听各serverType 节点和子节点上的 目录,数据变化情况,根据变化做相关业务逻辑;
- 检测本worker上工作进程的状态, 修改相关datanote信息
- 检测并保持zk 之间的session连接状态;
如何检测并保持session连接状态?
以上所有的zk client 都需要检测 保持session连接状态,防止网络不稳定,实现上有2种方案:
方案1: 利用watcher 回调通知监控: 实现watcher类, 类中process 函数,处理watchedevent, 通过检查回调函数返回的event.getState(),判断zk client与zookeeper 之间的session状态变化:
class InnerZK implements Watcher {
public void process(WatchedEvent event) {
// “数据变更”事件
if (event.getType() != EventType.None) {
System.out.println("ConfigClient watch------path:" + event.getPath() + ";state" + event.getState() + ";type:" + event.getType());
...
return;
}
// 如果是链接状态迁移
switch (event.getState()) {
case SyncConnected:
System.out.println("Config manager watch---Connected...");
rebuild();// 每次重连,都检测一下数据状态。
outdate = false;
break;
case Expired:
System.out.println("ConfigClient watch------Expired...");
// session重建
outdate = true;
break;
// session过期
case Disconnected:
// 链接断开,或session迁移
System.out.println("ConfigClient watch------Connecting....");
break;
case AuthFailed:
if (autoReconnected && thread.isAlive()) {
thread.interrupt();
}
throw new RuntimeException("ConfigClient watch------ZK Connection auth failed...");
default:
break;
}
}
}
方案2.创建监听线程定时主动轮询, 下面代码是基于线程主动轮询方式:
public ConfigManager(String connectString,boolean autoReconneted) { this.connectString=connectString; this.autoReconnected = autoReconneted; if (this.autoReconnected) { thread = new Thread(new FailureHandler()); thread.setDaemon(true); thread.start(); }else{ try { // 回话重建等异常行为 zkClient = new ZooKeeper(connectString, 3000, dw, false); System.out.println("Reconnected success!..."); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } }
class FailureHandler implements Runnable {
/**
* zk故障担保线程,如果需要故障检测或者容错,请将此实例交付给单独线程执行
* 比如:因为网络问题,zk实例将可能长时间处于无法链接状态,或者其它异常,导致zk实例化出错等
*/
public void run() {
try {
int i = 0;
int l = 100;// 每次重建,将时间延迟100ms
while (true) {
//System.out.println("Manager handler,running...tid: " + Thread.currentThread().getId());
if (zkClient == null || (zkClient.getState() == States.NOT_CONNECTED || zkClient.getState() == States.CLOSED)) {
lock.lock();
try {
// 回话重建等异常行为
zkClient = new ZooKeeper(connectString, 3000, dw, false);//Constants.connectString
System.out.println("Reconnected success!...");
} catch (Exception e) {
e.printStackTrace();
i++;
Thread.sleep(3000 + i * l);// 在zk环境异常情况下,每3秒重试一次
} finally {
lock.unlock();
}
continue;
}
if(zkClient.getState().isConnected()){
Thread.sleep(3000);// 如果被“中断”,直接退出
i = 0;
}
}
} catch (InterruptedException e) {
System.out.println("Exit...");
if(zkClient != null){
try{
zkClient.close();
}catch(Exception ze){
ze.printStackTrace();
}
}
}
}
}
没有评论