java连接zookeeper的实现示例

 更新时间:2023年11月01日 11:44:09   作者:曹朋羽  
ZooKeeper官方提供了Java API,可以通过Java代码来连接zookeeper服务进行操作,本文就来介绍一下java连接zookeeper的实现示例,具有一定的参考价值,感兴趣的可以了解一下

API

ZooKeeper官方提供了Java API,可以通过Java代码来连接zookeeper服务进行操作。可以连接、创建节点、获取节点数据、监听节点变化等操作,具体有以下几个重要的类:

  • ZooKeeper:ZooKeeper类是Java API的核心类,用于与ZooKeeper服务器建立连接,并提供了一系列方法来操作ZooKeeper的节点。
  • Watcher:Watcher是ZooKeeper的一个回调接口,当节点发生变化时会调用相应的方法进行通知。
  • CreateMode:CreateMode枚举类定义了节点的类型,包括永久节点、临时节点、顺序节点和临时顺序节点。
  • Stat:Stat类表示节点的元数据信息,比如修改版本、数据长度、子节点数量等。

添加依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.7.2</version>
</dependency>

操作例子

String host = "localhost:2181";
//建立连接
zooKeeper = new ZooKeeper(host, 2000, null);
String path = "/test";
Watcher watcher = new Watcher() {
  @Override
  public void process(WatchedEvent watchedEvent) {
    System.out.println("Node changed: " + watchedEvent.getPath());
    System.out.println(watchedEvent);
  }
};
//获取节点状态 如果不存在返回null
Stat stat = zooKeeper.exists(path, false);
if(null != stat){
	System.out.println(stat.getCzxid()+"-"+stat.getAversion());
}

//创建节点 包含版本、时间、数据长度等信息
zooKeeper.create(path,"123".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
//添加watcher
zooKeeper.addWatch(path, watcher, AddWatchMode.PERSISTENT);

//获取节点数据
byte[] data = zooKeeper.getData(path, false, null);
System.out.println(new String(data));

zooKeeper.create(path+"/1","child1".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
//获取子节点
List<String> children = zooKeeper.getChildren(path, false);
System.out.println("childs size:"+children.size());
//删除子节点
zooKeeper.delete(path+"/1",-1);
zooKeeper.close();

zkClient

zkClient封装了zookeeper的官方api,简化了一些繁琐的操作,并提供了一些额外的功能,提高了开发效.

添加依赖

<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.11</version>
</dependency>

zkclient对节点数据的操作进行了序列化, 这里先准备一个string类型的序列化类。需要实现ZkSerializer接口

public class ZkStringSerializer implements ZkSerializer {
    @Override
    public byte[] serialize(Object o) throws ZkMarshallingError {
        return String.valueOf(o).getBytes();
    }

    @Override
    public Object deserialize(byte[] bytes) throws ZkMarshallingError {
        return new String(bytes);
    }
}

基本操作

ZkClient zkClient = new ZkClient("localhost:2181");
//自定义序列化 否则报错
zkClient.setZkSerializer(new ZkStringSerializer());
String path = "/test";
//判断节点是否存在
boolean exist = zkClient.exists(path);
System.out.println(exist);
if(!exist){//创建节点
    zkClient.create(path,"123", CreateMode.PERSISTENT);
}
//读取节点数据
System.out.println((String) zkClient.readData(path));
zkClient.writeData(path,"456");//设置节点数据
System.out.println((String) zkClient.readData(path));
zkClient.delete(path);//删除节点

zkClient.close();

节点变化事件

String path = "/test";
/**
 * 节点变化事件
 * 只监听节点增减,不监听数据变化事件
 */
zkClient.subscribeChildChanges(path, new IZkChildListener() {
    @Override
    public void handleChildChange(String parentPath, List<String> children) throws Exception {
        System.out.println("节点"+parentPath+"发生变化");
        System.out.println(children);
    }
});
//节点操作,观察handleChildChange接收到对应事件
Thread.sleep(2000);
zkClient.createPersistent(path);
Thread.sleep(2000);
zkClient.createPersistent(path+"/child1");
Thread.sleep(2000);
zkClient.writeData(path+"/child1","123");
Thread.sleep(2000);
zkClient.delete(path+"/child1");
Thread.sleep(2000);
zkClient.delete(path);
Thread.sleep(100000);

节点数据变化事件

    String path = "/test";
    /**
     * 节点变化事件,只检测当前节点,感知不到其子节点
     * 节点被删除或节点数据变化
     */
    zkClient.subscribeDataChanges(path, new IZkDataListener() {
        @Override
        public void handleDataChange(String s, Object o) throws Exception {
            System.out.println("节点:"+s+"数据变为:"+o);
        }

        @Override
        public void handleDataDeleted(String s) throws Exception {
            System.out.println("节点:"+s+"删除");
        }
    });

    Thread.sleep(2000);
    zkClient.createPersistent(path);
    Thread.sleep(2000);
    zkClient.createPersistent(path+"/child1");
    Thread.sleep(2000);
    zkClient.delete(path+"/child1");
    Thread.sleep(2000);
    zkClient.writeData(path,"123");
    Thread.sleep(2000);
    zkClient.delete(path);
    Thread.sleep(100000);
}

Curator

curator是另一个java连接zookeeper类库。功能更加强大。提供了连接重试、分布式锁、选举、队列等多种实际场景的用例。这里先简单搞个使用例子。

添加依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.1.0</version>
</dependency>

curator-framework是基础的依赖,一些特定的使用方式需要添加不同的依赖,有curator-recipes、curator-x-discovery、curator-x-async等。

基本操作

//创建连接
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
String path = "/test";
client.checkExists().forPath(path);//判断是否存在
client.create().forPath(path, "123".getBytes());//创建节点
byte[] data = client.getData().forPath(path);//获取数据
System.out.println(new String(data));
client.setData().forPath(path, "456".getBytes());//设置数据
client.delete().forPath(path);//删除节点

client.close();

节点监听

CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
String path = "/test";
NodeCache  nodeCache = new NodeCache(client,path);
//添加监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
    @Override
    public void nodeChanged() throws Exception {
        ChildData data = nodeCache.getCurrentData();
        if (data != null) {
            System.out.println("Node changed: " + data.getPath() + ", value: " + new String(data.getData()));
        } else {
            System.out.println("Node deleted: " + nodeCache.getPath());
        }
    }
});
nodeCache.start();
client.create().forPath(path);
client.setData().forPath(path, "123".getBytes());
client.delete().forPath(path);
client.close();

这里NodeCache被标识@Deprecated,也不知道被什么方式代替了,后面再研究。先简单使用。

到此这篇关于java连接zookeeper的实现示例的文章就介绍到这了,更多相关java连接zookeeper内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java异常(Exception)处理机制详解

    java异常(Exception)处理机制详解

    这篇文章主要介绍了java异常(Exception)处理机制详解的相关资料,主要介绍异常的定义及使用方法,需要的朋友可以参考下
    2017-03-03
  • @RequestParam 参数偶尔丢失的解决

    @RequestParam 参数偶尔丢失的解决

    这篇文章主要介绍了@RequestParam 参数偶尔丢失的解决,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-10-10
  • Java substring方法实现原理解析

    Java substring方法实现原理解析

    这篇文章主要介绍了Java substring方法实现原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-05-05
  • Maven中央仓库发布的实现方法

    Maven中央仓库发布的实现方法

    最近做了个项目,希望能够上传到maven中央仓库,给更多的人使用,于是就产生了这次项目发布经历。感兴趣的可以一起来参考一下
    2021-06-06
  • Java实现读取SFTP服务器指定目录文件的方法

    Java实现读取SFTP服务器指定目录文件的方法

    SFTP是一种在安全通道上传输文件的协议,它是基于SSH(Secure Shell)协议的扩展,用于在客户端和服务器之间进行加密的文件传输,这篇文章主要介绍了Java实现读取SFTP服务器指定目录文件,感兴趣的朋友跟随小编一起看看吧
    2023-08-08
  • Spring--国内Java程序员用得最多的框架

    Spring--国内Java程序员用得最多的框架

    前几年面试最常问的且可以顺利拿到高薪的技能是Spring,随着Spring体系的壮大,除非你在简历上添加Spring Boot和Spring Cloud的技能,才可以打动面试官,而现在,除非是Spring全家桶的实战经验,否则难以让面试官高看
    2021-06-06
  • 通过一个命令轻松切换Java的版本

    通过一个命令轻松切换Java的版本

    这篇文章主要给大家介绍了如何通过一个命令轻松实现切换Java的版本,通过本文介绍的方法,大家就可以将jdk版本之间轻松切换,需要的朋友可以参考学习,下面跟着小编一起来看看吧。
    2017-05-05
  • MyBatis传递多个参数方式

    MyBatis传递多个参数方式

    这篇文章主要介绍了MyBatis传递多个参数方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-02-02
  • elk之实现在kibana高效精准查询日志

    elk之实现在kibana高效精准查询日志

    这篇文章主要介绍了elk之实现在kibana高效精准查询日志方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-05-05
  • 快速了解Spring Boot

    快速了解Spring Boot

    这篇文章主要介绍了快速了解Spring Boot,介绍了其环境准备,URL中的变量以及模板渲染等内容,具有一定参考价值,需要的朋友可以了解下。
    2017-11-11

最新评论