最近定位一个问题, 发现读取的ZK中存储的数据 不符合预期, 后来发现该Znode节点有2个 服务会有写入操作, 所以造成了数据重入;
因此需要一个分布式环境下的同步机制, 方案有很多, 最终选择使用Zookeeper实现 分布式锁的功能 ;
原理是利用ZK的顺序节点+watcher机制实现:
- 在zookeeper指定节点下创建临时顺序节点node_lock_rsX 作为 对资源X访问的Locks 的存放目录
- 当任务A要对资源X做写操作时, 在node_lock_rsX 节点下创建顺序临时子节点 lock_rsX_A_000x
- 任务A 对 node_lock_rsX 节点下所有子节点排序
- 任务A 判断本节点 lock_rsX_A_000x是不是第一个子节点(最小的节点),若是,则获取锁, 可以对资源X写入;若不是,则监听比该节点小的那个节点的删除事件
- 若监听事件生效,则回到第三步重新进行判断,直到获取到锁
方案1: 直接使用curator 客户端封装的分布式锁完成
非常简单
...
CuratorFramework client = CuratorFrameworkFactory.newClient("x.x.x.x:2181,y.y.y.y:2181,z.z.z.z:2181", retryPolicy);
client.start();
//创建分布式锁, 锁空间的根节点路径为/curator/lock
InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
//获得了锁, 进行业务流程
mutex.acquire();
System.out.println("Enter mutex");
...
//完成业务流程, 释放锁
mutex.release();
//关闭客户端
client.close();
...
mutex.acquire函数的源码 核心代码实现,internalLockLoop 内是怎么判断锁以及阻塞等待的
/获得锁
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
//获取所有的子节点列表,并且按序号从小到大排序
List<String> children = getSortedChildren();
//根据序号判断当前子节点是否为最小子节点
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
//如果为最小子节点则认为获得锁
haveTheLock = true;
}
else
{
//否则获取前一个子节点
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
//这里使用对象监视器做线程同步,当获取不到锁时监听前一个子节点删除消息并且进行wait(),当前一个子节点删除(也就是锁释放)时,回调会通过notifyAll唤醒此线程,此线程继续自旋判断是否获得锁
synchronized(this)
{
try
{
//这里使用getData()接口而不是checkExists()是因为,如果前一个子节点已经被删除了那么会抛出异常而且不会设置事件监听器,而checkExists虽然也可以获取到节点是否存在的信息但是同时设置了监听器,这个监听器其实永远不会触发,对于zookeeper来说属于资源泄露
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
//如果设置了阻塞等待的时间
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // 等待时间到达,删除对应的子节点
break;
}
//等待相应的时间
wait(millisToWait);
}else
{
//永远等待
wait();
}
}catch ( KeeperException.NoNodeException e )
{
//上面使用getData来设置监听器时,如果前一个子节点已经被删除那么会抛出NoNodeException,只需要自旋一次即可,无需额外处理
}
}
}
}
方案2. 使用ZK自带的Client端实现(网上样例)
原理和上面的源码差不多
public class ZookeeperDistributedLock {
public final static Joiner j = Joiner.on("|").useForNull("");
//zk客户端
private ZooKeeper zk;
//zk是一个目录结构,root为最外层目录
private String root = "/locks";
//锁的名称
private String lockName;
//当前线程创建的序列node
private ThreadLocal<String> nodeId = new ThreadLocal<>();
//用来同步等待zkclient链接到了服务端
private CountDownLatch connectedSignal = new CountDownLatch(1);
private final static int sessionTimeout = 3000;
private final static byte[] data= new byte[0];
public ZookeeperDistributedLock(String config, String lockName) {
this.lockName = lockName;
try {
zk = new ZooKeeper(config, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 建立连接
if (event.getState() == Event.KeeperState.SyncConnected) {
connectedSignal.countDown();
}
}
});
connectedSignal.await();
Stat stat = zk.exists(root, false);
if (null == stat) {
// 创建根节点
zk.create(root, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
class LockWatcher implements Watcher {
private CountDownLatch latch = null;
public LockWatcher(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted)
latch.countDown();
}
}
public void lock() {
try {
// 创建临时子节点
String myNode = zk.create(root + "/" + lockName , data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(j.join(Thread.currentThread().getName() + myNode, "created"));
// 取出所有子节点
List<String> subNodes = zk.getChildren(root, false);
TreeSet<String> sortedNodes = new TreeSet<>();
for(String node :subNodes) {
sortedNodes.add(root +"/" +node);
}
String smallNode = sortedNodes.first();
String preNode = sortedNodes.lower(myNode);
if (myNode.equals( smallNode)) {
// 如果是最小的节点,则表示取得锁
System.out.println(j.join(Thread.currentThread().getName(), myNode, "get lock"));
this.nodeId.set(myNode);
return;
}
CountDownLatch latch = new CountDownLatch(1);
Stat stat = zk.exists(preNode, new LockWatcher(latch));// 同时注册监听。
// 判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
if (stat != null) {
System.out.println(j.join(Thread.currentThread().getName(), myNode,
" waiting for " + root + "/" + preNode + " released lock"));
latch.await();// 等待,这里应该一直等待其他线程释放锁
nodeId.set(myNode);
latch = null;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void unlock() {
try {
System.out.println(j.join(Thread.currentThread().getName(), nodeId.get(), "unlock "));
if (null != nodeId) {
zk.delete(nodeId.get(), -1);
}
nodeId.remove();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
static int n = 500;
public static void main(String[] args) {
Runnable runnable = new Runnable() {
public void run() {
ZookeeperDistributedLock lock = null;
try {
lock = new ZookeeperDistributedLock("10.66.X.X:2181", "resouce1");
lock.lock();
System.out.println(--n);
System.out.println(Thread.currentThread().getName() + "正在运行");
} finally {
if (lock != null) {
lock.unlock();
}
}
}
};
for (int i = 0; i < 10; i++) {
Thread t = new Thread(runnable);
t.start();
}
}
}
没有评论