SpringBoot整合Zookeeper详细教程

 更新时间:2022年12月23日 14:18:00   作者:擅长开发Bug的Mr.NaCl  
Curator是Netflix公司开源的⼀套zookeeper客户端框架,Curator是对Zookeeper⽀持最好的客户端框架。Curator封装了⼤部分Zookeeper的功能,⽐如Leader选举、分布式锁等,减少了技术⼈员在使⽤Zookeeper时的底层细节开发⼯作

一、引言

使用原生的zookeeper时候会遇到watcher一次注册生效一次等情况,因此使用curator

curator是Netflix公司开源的一个 zookeeper客户端原生API接口上进行了包装,解决了很多问题并提供Zookeeper分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等的应用的抽象封装

二、引入依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.2</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>5.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>5.2.0</version>
</dependency>
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>2.0.10</version>
</dependency>

三、编写客户端

要改Windows的host文件。host文件位置是C:\Windows\System32\drivers\etc

3.1、ZookeeperConfig

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ZookeeperConfig {
    //集群地址,分割不能有空格 在程序里写connectString不可使用ip,必须使用主机名
    private String connectString = "master:2181,slave1:2181,slave2:2181";
    //连接超时时间
    private int sessionTimeout = 5000;
    //会话存活时间,根据业务灵活指定
    private Integer sessionTimeOut=5000;
    //重试机制时间参数
    private Integer sleepMsBetweenRetry=1000;
    //重试机制重试次数
    private Integer maxRetries=3;
    //命名空间(父节点名称)
    private String namespace="";
    /**
     - `session`重连策略
     - `RetryPolicy retry Policy = new RetryOneTime(3000);`
     - 说明:三秒后重连一次,只重连一次
     - `RetryPolicy retryPolicy = new RetryNTimes(3,3000);`
     - 说明:每三秒重连一次,重连三次
     - `RetryPolicy retryPolicy = new RetryUntilElapsed(1000,3000);`
     - 说明:每三秒重连一次,总等待时间超过个`10`秒后停止重连
     - `RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3)`
     - 说明:这个策略的重试间隔会越来越长
     - 公式:`baseSleepTImeMs * Math.max(1,random.nextInt(1 << (retryCount + 1)))`
     - `baseSleepTimeMs` = `1000` 例子中的值
     - `maxRetries` = `3` 例子中的值
     */
    @Bean("curatorClient")
    public CuratorFramework curatorClient() throws Exception {
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .connectionTimeoutMs(sessionTimeout)
                .sessionTimeoutMs(sessionTimeOut)
                //session重连策略
                .retryPolicy(new ExponentialBackoffRetry(sleepMsBetweenRetry,maxRetries))
                //设置命名空间 在操作节点的时候,会以这个为父节点
                .namespace(namespace)
                .build();
        client.start();
        //注册监听器
        ZookeeperWatches watches = new ZookeeperWatches(client);
        watches.znodeWatcher();
        watches.znodeChildrenWatcher();
        return client;
    }
}

3.2、ZookeeperWatches

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.zookeeper.data.Stat;
public class ZookeeperWatches {
    private CuratorFramework client;
    public ZookeeperWatches(CuratorFramework client) {
        this.client = client;
    }
    public void znodeWatcher() throws Exception {
        NodeCache nodeCache = new NodeCache(client, "/");
        nodeCache.start();
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("=======节点改变===========");
                String path = nodeCache.getPath();
                String currentDataPath = nodeCache.getCurrentData().getPath();
                String currentData = new String(nodeCache.getCurrentData().getData());
                Stat stat = nodeCache.getCurrentData().getStat();
                System.out.println("path:"+path);
                System.out.println("currentDataPath:"+currentDataPath);
                System.out.println("currentData:"+currentData);
            }
        });
        System.out.println("节点监听注册完成");
    }
    public void znodeChildrenWatcher() throws Exception {
        PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/",true);
        pathChildrenCache.start();
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println("=======节点子节点改变===========");
                PathChildrenCacheEvent.Type type = event.getType();
                String childrenData = new String(event.getData().getData());
                String childrenPath = event.getData().getPath();
                Stat childrenStat = event.getData().getStat();
                System.out.println("子节点监听类型:"+type);
                System.out.println("子节点路径:"+childrenPath);
                System.out.println("子节点数据:"+childrenData);
                System.out.println("子节点元数据:"+childrenStat);
            }
        });
        System.out.println("子节点监听注册完成");
    }
}

3.3、ZookeeperController

@RestController
@RequestMapping(value = "/zookeeper")
public class ZookeeperController {
    @Resource(name = "curatorClient")
    private CuratorFramework client;
    @RequestMapping("/createZnode")
    public String createZnode(){
        String path = "/nacl";
        String data = "shuaige";
        List<ACL> aclList = new ArrayList<>();
        Id id = new Id("world", "anyone");
        aclList.add(new ACL(ZooDefs.Perms.ALL, id));
        try {
            client.create()
                    .creatingParentsIfNeeded()  //没有父节点时 创建父节点
                    .withMode(CreateMode.PERSISTENT)  //节点类型
                    .withACL(aclList)   //配置权限
                    .forPath(path, data.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
            return "节点创建失败"+e.getMessage();
        }
        return "节点创建成功";
    }
    @RequestMapping("/selectZnode")
    public String  selectZnode(){
        HashMap<String,String> hashMap=new HashMap();
        String path="/nacl";
        Stat stat;
        try {
            stat = client.checkExists().forPath(path);
            if (stat == null) {
                hashMap.put("Error","不存在该节点");
            }
            String dataString = new String(client.getData().forPath(path));
            hashMap.put(path, dataString);
            hashMap.put("stat", stat.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return JSON.toJSONString(hashMap);
    }
    @RequestMapping("/selectChildrenZnode")
    public String selectChildrenZnode(){
        Map<String, String> hashMap = new HashMap<>();
        String path="/";
        try {
            List<String> list = client.getChildren().forPath(path);
            for (String s : list) {
                String dataString = new String(client.getData().forPath(path+s));
                hashMap.put(path+s, dataString);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return JSON.toJSONString(hashMap);
    }
    @RequestMapping("/setData")
    public String setData() {
        String path="/nacl";
        String data="big";
        Integer version=0;
        HashMap<String,String> hashMap=new HashMap<>();
        try {
            Stat stat = client.setData().withVersion(version).forPath(path, data.getBytes());
            hashMap.put("success", "修改成功");
            hashMap.put("version", String.valueOf(stat.getVersion()));
        } catch (Exception e) {
            e.printStackTrace();
            hashMap.put("error", "修改失败:"+e.getMessage());
        }
        return JSON.toJSONString(hashMap);
    }
    @RequestMapping("/delete")
    public String delete() {
        HashMap<String,String> hashMap=new HashMap<>();
        String path="/nacl";
        String data="big";
        Integer version=1;
        try {
            client.delete().withVersion(version).forPath(path);
            hashMap.put("success", "删除成功");
        } catch (Exception e) {
            e.printStackTrace();
            hashMap.put("error", "删除失败:"+e.getMessage());
        }
        return JSON.toJSONString(hashMap);
    }
    @RequestMapping("/createAsyncZnode")
    public String createAsyncZnode(){
        String path = "/nacl";
        String data = "shuaige";
        try {
            client.create()
                    .creatingParentsIfNeeded()
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    //异步回调   增删改都有异步方法
                    .inBackground(new BackgroundCallback() {
                        @Override
                        public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
                            System.out.println("异步回调--获取权限:"+client.getACL().forPath(path));
                            System.out.println("异步回调--获取数据:"+new String(client.getData().forPath(path)));
                            System.out.println("异步回调--获取事件名称:"+event.getName());
                            System.out.println("异步回调--获取事件类型:"+event.getType());
                        }
                    })
                    .forPath(path, data.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
            return "节点创建失败"+e.getMessage();
        }
        return "节点创建成功";
    }
}

到此这篇关于SpringBoot整合Zookeeper详细教程的文章就介绍到这了,更多相关SpringBoot整合Zookeeper内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 浅谈HTTP使用BASIC认证的原理及实现方法

    浅谈HTTP使用BASIC认证的原理及实现方法

    下面小编就为大家带来一篇浅谈HTTP使用BASIC认证的原理及实现方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-11-11
  • 深入解析Java编程中面向字节流的一些应用

    深入解析Java编程中面向字节流的一些应用

    这篇文章主要介绍了Java编程中面向字节流的一些应用,是Java入门学习中的基础知识,需要的朋友可以参考下
    2015-10-10
  • 剑指Offer之Java算法习题精讲链表与字符串及数组

    剑指Offer之Java算法习题精讲链表与字符串及数组

    跟着思路走,之后从简单题入手,反复去看,做过之后可能会忘记,之后再做一次,记不住就反复做,反复寻求思路和规律,慢慢积累就会发现质的变化
    2022-03-03
  • Java中的内存泄露问题和解决办法

    Java中的内存泄露问题和解决办法

    大家好,本篇文章主要讲的是Java中的内存泄露问题和解决办法,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下
    2022-01-01
  • Java集成Onlyoffice的示例代码及场景分析

    Java集成Onlyoffice的示例代码及场景分析

    这篇文章主要介绍了Java集成Onlyoffice的示例代码及场景分析,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2025-05-05
  • Java批量从svn导出多个项目代码实例

    Java批量从svn导出多个项目代码实例

    这篇文章主要介绍了java批量从svn导出多个项目代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-03-03
  • Java中ArrayList集合的遍历方式的多种方法

    Java中ArrayList集合的遍历方式的多种方法

    本文主要介绍了Java中ArrayList集合的遍历方式的多种方法,包括普通for循环、增强for循环、迭代器、ListIterator、Stream API和并行流,具有一定的参考价值,感兴趣的可以了解一下
    2025-05-05
  • Java报错状态码快速定位与解决方法

    Java报错状态码快速定位与解决方法

    在日常开发中Java程序员最头疼的问题之一就是遇到各种报错状态码,这些状态码就像谜语一样让人摸不着头脑,比如突然蹦出一个500或者404,新手可能会直接懵掉,别担心!这篇文章会带你彻底搞懂Java报错状态码的来龙去脉,需要的朋友可以参考下
    2025-05-05
  • java -D参数设置系统属性无效问题及解决

    java -D参数设置系统属性无效问题及解决

    这篇文章主要介绍了java -D参数设置系统属性无效问题及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-12-12
  • Java 轻松掌握字符缓冲流的使用

    Java 轻松掌握字符缓冲流的使用

    这篇文章主要介绍了Java的字符缓冲流用法,字符缓冲流的用途很多,主要是几个构造方法的使用,在项目开发中经常会用到,需要的朋友参考下吧
    2022-04-04

最新评论