通过HA访问Hdfs的时候如何获取到活跃节点是一个稍稍有些麻烦的事情。
目前使用过两种方案:一是通过webhdfs接口逐一访问测试,找到状态为可用的节点;一是在zookeeper上直接获取当前活跃的节点。
简单说下第二种方案。ha的ActiveNode在zookeeper上的存储节点为:/hadoop-ha/dcnameservice/ActiveStandbyElectorLock。只需要通过ZooKeeper的API监听获取这个节点的信息即可。不过这个节点保存的信息不能当做字符串来读取,它是一个序列化后的对象,需要反序列化才能使用。
实现的代码如下:
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hdfs.server.namenode.ha.proto.HAZKInfoProtos;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import static org.apache.zookeeper.Watcher.Event.EventType.NodeDataChanged;
public class ZooKeeperClient {
private static ZooKeeperzk;
private static Statstat = new Stat();
private static String HA_ACTIVE_NODE = "";
static {
try {
zk = new ZooKeeper(HDFS_HA_ZK_ADDRESS, 50000, new Watcher() {
@Override
public void process(WatchedEventevent) {
if (event.getType() == NodeDataChanged) {
update();
System.out.println("HA Address changed......");
}
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}
update();
}
private static void update() {
try {
byte[] bytes = zk.getData(HDFS_HA_ZK_NODE, true, stat);
HAZKInfoProtos.ActiveNodeInfonodeInfo = HAZKInfoProtos.ActiveNodeInfo.parseFrom(bytes);
HA_ACTIVE_NODE = nodeInfo.getHostname();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
public static String getActiveNode() {
return HA_ACTIVE_NODE;
}
public static void main(String[] args) {
System.out.println(getActiveNode());
}
}
就这样。