简易智能识别服务3.1–使用zookeeper 实现简单的分布式信息同步

2018年10月22日

zk 分布式同步功能,可以由3个角色组成:  zk config manager(消息生产) ,   zk config server(消息生产2),   zk config client(消息消费)

zk config manager:  任务部署在web server端,主要功能:

  • 负责创建管理所有的“serverType” ,即一级节点,用于区分不同的配置内容,   可由 管理员通过前台界面配置一共有几个一级节点;比如一个“serverType” 用于存储集群中多个worker的相关信息,  可以给worker集群设置一个一级节点为”workers”
  •  检测并保持与ZK 之间session连接状态;

 

zk config server: 任务部署在Worker集群中的机器上, 主要功能:

  • 每个serverType都有多个子节点,子节点由configServer实例负责注册。比如worker集群中每个worker上的节点就是”worker的IP地址”, 作为”workers”一级节点下的二级节点:

  • 对关注的datanote 的watch 做循环注册:  即在watcher监听事件后, 补充再次注册一下该节点的watcher
class InnerZK implements Watcher {

    public void process(WatchedEvent event) {
        // 如果是“数据变更”事件
        if (event.getType() != EventType.None) {
            System.out.println("ConfigServer watch------path:" + event.getPath() + ";state" + event.getState() + ";type:" + event.getType());
            switch (event.getType()) {
                case NodeCreated:
                    register();
                    break;
                case NodeDeleted:
                    // 如果父节点被删除,那么此后子节点也将不复存在
                    path = null;
                    register();// 注册watch,检测父节点/serverType再次创建。
                    break;
                default:
                    break;
            }
            return;
        }
        // 如果是链接状态迁移
        // 参见keeperState
        switch (event.getState()) {
            ...
        }
    }
}
private boolean register() {
        lock.lock();
        init = false;
        try {
            Stat stat = zkClient.exists("/" + serverType, true);// 注册“/serverType”的watch,跟踪节点的创建/删除
            // 创建跟节点:/cache-server
            // 如果跟节点不存在,则等待configManager去创建,创建成功后,将会在下文的watch事件中创建此子节点。
            if (stat == null) {
                return false;
            }

            init = true;
            synchronized (tag) {
                tag.notifyAll();
            }
        } catch (NodeExistsException ne) {
            // ignore.
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return true;
    }
  • 检测并保持与zk 之间的session连接状态;

zk config client: 任务部署在Worker上, 主要功能:

  • 监听各serverType 节点和子节点上的 目录,数据变化情况,根据变化做相关业务逻辑;
  • 检测本worker上工作进程的状态, 修改相关datanote信息
  • 检测并保持zk 之间的session连接状态;

 

如何检测并保持session连接状态?

以上所有的zk client 都需要检测 保持session连接状态,防止网络不稳定,实现上有2种方案:

方案1:    利用watcher 回调通知监控:  实现watcher类, 类中process 函数,处理watchedevent,  通过检查回调函数返回的event.getState(),判断zk client与zookeeper 之间的session状态变化:

class InnerZK implements Watcher {

        public void process(WatchedEvent event) {
            // “数据变更”事件
            if (event.getType() != EventType.None) {
                System.out.println("ConfigClient watch------path:" + event.getPath() + ";state" + event.getState() + ";type:" + event.getType());
                ...
                return;
            }
            // 如果是链接状态迁移
            switch (event.getState()) {
                case SyncConnected:
                    System.out.println("Config manager watch---Connected...");
                    rebuild();// 每次重连,都检测一下数据状态。
                    outdate = false;
                    break;
                case Expired:
                    System.out.println("ConfigClient watch------Expired...");
                    // session重建
                    outdate = true;
                    break;
                // session过期
                case Disconnected:
                    // 链接断开,或session迁移
                    System.out.println("ConfigClient watch------Connecting....");
                    break;
                case AuthFailed:
                    if (autoReconnected && thread.isAlive()) {
                        thread.interrupt();
                    }
                    throw new RuntimeException("ConfigClient watch------ZK Connection auth failed...");
                default:
                    break;
            }
        }
    }

 

方案2.创建监听线程定时主动轮询, 下面代码是基于线程主动轮询方式:

public ConfigManager(String connectString,boolean autoReconneted) {
    this.connectString=connectString;
    this.autoReconnected = autoReconneted;
    if (this.autoReconnected) {
        thread = new Thread(new FailureHandler());
        thread.setDaemon(true);
        thread.start();
    }else{
        try {
            // 回话重建等异常行为
            zkClient = new ZooKeeper(connectString, 3000, dw, false);
            System.out.println("Reconnected success!...");
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }
}
class FailureHandler implements Runnable {
    /**
     * zk故障担保线程,如果需要故障检测或者容错,请将此实例交付给单独线程执行
     * 比如:因为网络问题,zk实例将可能长时间处于无法链接状态,或者其它异常,导致zk实例化出错等
     */
    public void run() {
        try {
            int i = 0;
            int l = 100;// 每次重建,将时间延迟100ms
            while (true) {

                //System.out.println("Manager handler,running...tid: " + Thread.currentThread().getId());
                if (zkClient == null || (zkClient.getState() == States.NOT_CONNECTED || zkClient.getState() == States.CLOSED)) {
                    lock.lock();
                    try {
                        // 回话重建等异常行为
                        zkClient = new ZooKeeper(connectString, 3000, dw, false);//Constants.connectString
                        System.out.println("Reconnected success!...");
                    } catch (Exception e) {
                        e.printStackTrace();
                        i++;
                        Thread.sleep(3000 + i * l);// 在zk环境异常情况下,每3秒重试一次
                    } finally {
                        lock.unlock();
                    }
                    continue;
                }

                if(zkClient.getState().isConnected()){
                    Thread.sleep(3000);// 如果被“中断”,直接退出
                    i = 0;
                }

            }
        } catch (InterruptedException e) {
            System.out.println("Exit...");
            if(zkClient != null){
                try{
                    zkClient.close();
                }catch(Exception ze){
                    ze.printStackTrace();
                }
            }
        }
    }
}

 

 

 

 

 

 

没有评论

发表评论

邮箱地址不会被公开。 必填项已用*标注