本文主要介绍使用Zookeeper提供的原生API来操作Zookeeper,文中所使用到的软件版本:Java 1.8.0_191、Zookeeper 3.6.0、Junit 4.13。
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
package com.inspur.demo.general.zookeeper;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
/**
* Zookeeper基本操作列子
*/
public class ZookeeperCase {
//Zookeeper地址,集群多个地址间用逗号分隔,如:10.49.196.10:2181,10.49.196.11:2181,10.49.196.12:2181
private static String connectString = "10.49.196.10:2181";
private static int sessionTimeout = 2 * 1000;
private ZooKeeper zooKeeper;
@Before
public void before() {
try {
zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
}
});
System.out.println(zooKeeper.getState());
} catch (IOException e) {
e.printStackTrace();
}
}
@After
public void after() throws Exception {
zooKeeper.close();
}
/**
* 创建节点
*/
@Test
public void create() throws Exception {
/*
* 同步创建持久节点,ACL为world:anyone:cdrwa
* 等同于该命令:create /javatest/node1 test world:anyone:cdrwa
*/
zooKeeper.create("/javatest/node1", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
/*
* 同步创建持久节点,ACL为world:anyone:cr
* 等同于该命令:create /javatest/node2 test world:anyone:cr
*/
zooKeeper.create("/javatest/node2", "test".getBytes(), Collections.singletonList(new ACL((ZooDefs.Perms.CREATE + ZooDefs.Perms.READ), ZooDefs.Ids.ANYONE_ID_UNSAFE)), CreateMode.PERSISTENT);
/*
* 异步创建临时顺序节点,ACL为ip:127.0.0.1:c
* 等同于该命令:create -s -e /javatest/node3 test ip:127.0.0.1:c
*/
CountDownLatch counter = new CountDownLatch(1);
zooKeeper.create("/javatest/node3", "test".getBytes(), Collections.singletonList(new ACL(ZooDefs.Perms.CREATE, new Id("ip", "127.0.0.1"))), CreateMode.EPHEMERAL_SEQUENTIAL
,new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ", name=" + name);
counter.countDown();
}
}, "上下文对象,异步回调时会传递给callback");
counter.await();
/*
* 同步创建持久节点,ACL为digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa
* 等同于该命令:create /javatest/node4 test digest:jack:tgi9UCnyPo5FJjVylKr05nAlWeg=:cdrwa
* 添加认证用户(addauth digest jack:123456)后才能访问/javatest/node4
*/
zooKeeper.create("/javatest/node4", "test".getBytes(), Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("digest", "jack:tgi9UCnyPo5FJjVylKr05nAlWeg="))) , CreateMode.PERSISTENT);
/*
* 同步创建顺序持久节点,ACL为world:anyone:cdrwa,存活时间为5秒
* 等同于该命令:create -s -t 5000 /javatest/node5 test
*/
Stat stat = new Stat();
zooKeeper.create("/javatest/node5", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL, stat, 5000);
System.out.println(stat);
}
/**
* 获取节点数据
* @throws Exception
*/
@Test
public void getData() throws Exception {
//同步读取数据
Stat stat = new Stat();
byte[] data = zooKeeper.getData("/javatest/node1", false, stat);
System.out.println(new String(data));
System.out.println(stat);
//异步读取数据
zooKeeper.addAuthInfo("digest", "jack:123456".getBytes());
CountDownLatch counter = new CountDownLatch(1);
zooKeeper.getData("/javatest/node4", false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
String s = "";
if (data != null) {
s = new String(data);
}
System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",data=" + s + ",stat=" + stat);
counter.countDown();
}
}, "上下文对象,异步回调时会传递给callback");
counter.await();
}
@Test
public void setData() throws Exception {
//同步设置数据,version为-1表示匹配任何版本
Stat stat = zooKeeper.setData("/javatest/node1", "test2".getBytes(), -1);
System.out.println(stat);
//异步设置数据
zooKeeper.addAuthInfo("digest", "jack:123456".getBytes());
CountDownLatch counter = new CountDownLatch(1);
zooKeeper.setData("/javatest/node4", "test2".getBytes(), -1, new AsyncCallback.StatCallback(){
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("rc=" + rc + ",path=" + path + ",stat=" + stat);
counter.countDown();
}
}, "上下文对象,异步回调时会传递给callback");
counter.await();
}
@Test
public void delete() throws Exception {
//同步删除数据
zooKeeper.delete("/javatest/node1", -1);
//异步删除数据
CountDownLatch counter = new CountDownLatch(1);
zooKeeper.delete("/javatest/node2", -1, new AsyncCallback.VoidCallback(){
@Override
public void processResult(int rc, String path, Object ctx) {
System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx);
counter.countDown();
}
}, "上下文对象,异步回调时会传递给callback");
counter.await();
}
}
DataMonitor类实现对节点的监控,节点有变化时会回调DataMonitorListener.process方法,该方法由调用方根据业务来实现;WatcherCase类传入需要的参数来启动DataMonitor。
该例子是根据 官网例子 改造而来,相较官网更简单了些。
package com.inspur.demo.general.zookeeper;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
public class DataMonitor implements Runnable {
private ZooKeeper zk;
private DataMonitorListener listener;
/**
* 节点变化时会回调该方法,把监控变化类型及新数据带过来
*/
public interface DataMonitorListener {
void process(WatchedEvent event, byte[] data);
}
public DataMonitor(String hostPort, String znode, DataMonitorListener listener) throws Exception {
this.listener = listener;
AsyncCallback.StatCallback callback = new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
System.out.println("rc=" + rc + ",path=" + path + ",ctx=" + ctx + ",stat=" + stat);
switch (rc) {
case KeeperException.Code.Ok:
case KeeperException.Code.NoNode:
return;
case KeeperException.Code.SessionExpired:
case KeeperException.Code.NoAuth:
close();
return;
default:
zk.exists(znode, true, this, null);
return;
}
}
};
//监视器
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println(event);
if (event.getType() == Event.EventType.None) {
switch (event.getState()) {
case SyncConnected:
break;
case Expired:
close();
break;
}
} else {
try {
byte[] bytes = zk.getData(event.getPath(), false, null);
listener.process(event, bytes);
} catch (Exception e) {
e.printStackTrace();
}
if (event.getPath() != null && event.getPath().equals(znode)) {
//再次监控
zk.exists(znode, true, callback, null);
}
}
}
};
zk = new ZooKeeper("10.49.196.10:2181", 20000, watcher);
zk.exists(znode, true, callback, null);
}
@Override
public void run() {
try {
synchronized (this) {
wait();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void close() {
synchronized (this) {
notifyAll();
}
}
}
package com.inspur.demo.general.zookeeper;
import org.apache.zookeeper.*;
/**
* 监视节点样例
*/
public class WatcherCase {
public static void main(String[] args) throws Exception {
DataMonitor.DataMonitorListener listener = new DataMonitor.DataMonitorListener() {
@Override
public void process(WatchedEvent event, byte[] data) {
//todo:根据实际情况处理
if (event.getType() == Watcher.Event.EventType.NodeDataChanged) {
System.out.println(new String(data));
}
}
};
new DataMonitor("10.49.196.10:2181", "/watchtest", listener).run();
}
}