Java Zookeeper分布式分片算法超详细讲解流程

 更新时间:2023年03月01日 10:14:11   作者:Redick01  
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等

背景

公司的一个服务需要做类似于分片的逻辑,一开始服务基于传统部署方式通过本地配置文件配置的方式就可以指定该机器服务的分片内容如:0,1,2,3,随着系统的升级迭代,该服务进行了容器化部署,所以原来基于本地配置文件各自配置分片数据的方式就不适用了,原来的部署方式使得服务是有状态,是一种非云原生的方式,所以该服务要重新设计实现一套分布式服务分片逻辑。

技术方案

分布式协调中间件

要实现分布式服务分片的能力,需要有一个分布式中间件,如:RedisMysqlZookeeper等等都可以,我们选用Zookeeper

基于Zookeeper的技术方案

使用Zookeeper主要是基于Zookeeper的临时节点和节点变化监听机制,具体的技术设计如下:

服务注册目录设计

Zookeeper的数据存储结构类似于目录,服务注册后的目录类似如下结构:

解释下该目录结构,首先/xxxx/xxxx/sharding是区别于其他业务的的目录,该目录节点是持久的,service是服务目录,标识一个服务,该节点也是持久的,ip1ip2是该服务注册到Zookeeper的机器列表节点,该节点是临时节点。

/xxxx/xxxx/sharding/service/ip1
-----|----|--------|-------/ip2

服务分片处理流程

  • 服务启动,创建CuratorFramework客户端,设置客户端连接状态监听;
  • Zookeeper注册该机器的信息,这里设计简单,机器信息就是ip地址;
  • 注册机器信息后,从Zookeeper获取所有注册信息;
  • 根据Zookeeper获取的所有注册机器信息根据分片算法进行分片计算。

编码实现

ZookeeperConfig

Zookeeper的配置信息

@Data
public class ZookeeperConfig {
    /**
     * zk集群地址
     */
    private String zkAddress;
    /**
     * 注册服务目录
     */
    private String nodePath;
    /**
     * 分片的服务名
     */
    private String serviceName;
    /**
     * 分片总数
     */
    private Integer shardingCount;
    public ZookeeperConfig(String zkAddress, String nodePath, String serviceName, Integer shardingCount) {
        this.zkAddress = zkAddress;
        this.nodePath = nodePath;
        this.serviceName = "/" + serviceName;
        this.shardingCount = shardingCount;
    }
    /**
     * 等待重试的间隔时间的初始值.
     * 单位毫秒.
     */
    private int baseSleepTimeMilliseconds = 1000;
    /**
     * 等待重试的间隔时间的最大值.
     * 单位毫秒.
     */
    private int maxSleepTimeMilliseconds = 3000;
    /**
     * 最大重试次数.
     */
    private int maxRetries = 3;
    /**
     * 会话超时时间.
     * 单位毫秒.
     */
    private int sessionTimeoutMilliseconds;
    /**
     * 连接超时时间.
     * 单位毫秒.
     */
    private int connectionTimeoutMilliseconds;
}

InstanceInfo注册机器

@AllArgsConstructor
@EqualsAndHashCode()
public class InstanceInfo {
    private String ip;
    public String getInstance() {
        return ip;
    }
}

ZookeeperShardingService分片服务

@Slf4j
public class ZookeeperShardingService {
    public final Map<String, List<Integer>> caches = new HashMap<>(16);
    private final CuratorFramework client;
    private final ZookeeperConfig zkConfig;
    private final ShardingStrategy shardingStrategy;
    private final InstanceInfo instanceInfo;
    private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(1);
    public ZookeeperShardingService(ZookeeperConfig zkConfig, ShardingStrategy shardingStrategy) {
        this.zkConfig = zkConfig;
        log.info("开始初始化zk, ip列表是: {}.", zkConfig.getZkAddress());
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
                .connectString(zkConfig.getZkAddress())
                .retryPolicy(new ExponentialBackoffRetry(zkConfig.getBaseSleepTimeMilliseconds(), zkConfig.getMaxRetries(), zkConfig.getMaxSleepTimeMilliseconds()));
        if (0 != zkConfig.getSessionTimeoutMilliseconds()) {
            builder.sessionTimeoutMs(zkConfig.getSessionTimeoutMilliseconds());
        }
        if (0 != zkConfig.getConnectionTimeoutMilliseconds()) {
            builder.connectionTimeoutMs(zkConfig.getConnectionTimeoutMilliseconds());
        }
        this.shardingStrategy = shardingStrategy;
        HostInfo host = new HostInfo();
        this.instanceInfo = new InstanceInfo(host.getAddress());
        client = builder.build();
        client.getConnectionStateListenable().addListener(new ConnectionListener());
        client.start();
        try {
            COUNT_DOWN_LATCH.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 注册服务节点监听
        registerPathChildListener(zkConfig.getNodePath() + zkConfig.getServiceName(), new ChildrenPathListener());
        try {
            if (!client.blockUntilConnected(zkConfig.getMaxSleepTimeMilliseconds() * zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) {
                client.close();
                throw new KeeperException.OperationTimeoutException();
            }
        } catch (final Exception ex) {
            ex.printStackTrace();
            throw new RuntimeException(ex);
        }
    }
    /**
     * 子节点监听器
     * @param nodePath 主节点
     * @param listener 监听器
     */
    private void registerPathChildListener(String nodePath, PathChildrenCacheListener listener) {
        try {
            // 1. 创建一个PathChildrenCache
            PathChildrenCache pathChildrenCache = new PathChildrenCache(client, nodePath, true);
            // 2. 添加目录监听器
            pathChildrenCache.getListenable().addListener(listener);
            // 3. 启动监听器
            pathChildrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
        } catch (Exception e) {
            log.error("注册子目录监听器出现异常,nodePath:{}",nodePath,e);
            throw new RuntimeException(e);
        }
    }
    /**
     * 服务启动,注册zk节点
     * @throws Exception 异常
     */
    private void zkOp() throws Exception {
        // 是否存在ruubypay-sharding主节点
        if (null == client.checkExists().forPath(zkConfig.getNodePath())) {
            client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath(), Hashing.sha1().hashString("sharding", Charsets.UTF_8).toString().getBytes());
        }
        // 是否存服务主节点
        if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName())) {
            // 创建服务主节点
            client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(zkConfig.getNodePath() + zkConfig.getServiceName());
        }
        // 检查是否存在临时节点
        if (null == client.checkExists().forPath(zkConfig.getNodePath() + zkConfig.getServiceName() + "/" + instanceInfo.getInstance())) {
            System.out.println(zkConfig.getNodePath() + zkConfig.getServiceName() +  "/" + instanceInfo.getInstance());
            // 创建临时节点
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(zkConfig.getNodePath() + zkConfig.getServiceName() +
                    "/" + instanceInfo.getInstance(), zkConfig.getShardingCount().toString().getBytes(StandardCharsets.UTF_8));
        }
        shardingFromZk();
    }
    /**
     * 从zk获取机器列表并进行分片
     * @throws Exception 异常
     */
    private void shardingFromZk() throws Exception {
        // 从 serviceName 节点下获取所有Ip列表
        final GetChildrenBuilder childrenBuilder = client.getChildren();
        final List<String> instanceList = childrenBuilder.watched().forPath(zkConfig.getNodePath() + zkConfig.getServiceName());
        List<InstanceInfo> res = new ArrayList<>();
        instanceList.forEach(s -> {
            res.add(new InstanceInfo(s));
        });
        Map<InstanceInfo, List<Integer>> shardingResult = shardingStrategy.sharding(res, zkConfig.getShardingCount());
        // 先清一遍缓存
        caches.clear();
        shardingResult.forEach((k, v) -> {
            caches.put(k.getInstance().split("-")[0], v);
        });
    }
    /**
     * zk连接监听
     */
    private class ConnectionListener implements ConnectionStateListener {
        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            if (newState == ConnectionState.CONNECTED || newState == ConnectionState.LOST || newState == ConnectionState.RECONNECTED) {
                try {
                    zkOp();
                } catch (Exception e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                } finally {
                    COUNT_DOWN_LATCH.countDown();
                }
            }
        }
    }
    /**
     * 子节点监听
     */
    private class ChildrenPathListener implements PathChildrenCacheListener {
        @Override
        public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
            PathChildrenCacheEvent.Type type = event.getType();
            if (PathChildrenCacheEvent.Type.CHILD_ADDED == type || PathChildrenCacheEvent.Type.CHILD_REMOVED == type) {
                try {
                    shardingFromZk();
                } catch (Exception e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

分片算法

采用平均分配的算法

public interface ShardingStrategy {
    Map<InstanceInfo, List<Integer>> sharding(final List<InstanceInfo> list, Integer shardingCount);
}
public class AverageAllocationShardingStrategy implements ShardingStrategy {
    @Override
    public Map<InstanceInfo, List<Integer>> sharding(List<InstanceInfo> list, Integer shardingCount) {
        if (list.isEmpty()) {
            return null;
        }
        Map<InstanceInfo, List<Integer>> result = shardingAliquot(list, shardingCount);
        addAliquant(list, shardingCount, result);
        return result;
    }
    private Map<InstanceInfo, List<Integer>> shardingAliquot(final List<InstanceInfo> instanceInfos, final int shardingTotalCount) {
        Map<InstanceInfo, List<Integer>> result = new LinkedHashMap<>(shardingTotalCount, 1);
        int itemCountPerSharding = shardingTotalCount / instanceInfos.size();
        int count = 0;
        for (InstanceInfo each : instanceInfos) {
            List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
            for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                shardingItems.add(i);
            }
            result.put(each, shardingItems);
            count++;
        }
        return result;
    }
    private void addAliquant(final List<InstanceInfo> instanceInfos, final int shardingTotalCount, final Map<InstanceInfo, List<Integer>> shardingResults) {
        int aliquant = shardingTotalCount % instanceInfos.size();
        int count = 0;
        for (Map.Entry<InstanceInfo, List<Integer>> entry : shardingResults.entrySet()) {
            if (count < aliquant) {
                entry.getValue().add(shardingTotalCount / instanceInfos.size() * instanceInfos.size() + count);
            }
            count++;
        }
    }
}

总结

基于Zookeeper和简单的平均分配算法实现了一个简单的分布式分片服务,该分片服务目前满足公司需求,因为其简单,所以不一定满足其他场景,针对其他场景还需考虑其他因素,该示例供参考。

到此这篇关于Java Zookeeper分布式分片算法超详细讲解流程的文章就介绍到这了,更多相关Java Zookeeper分布式分片算法内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java并发编程工具类JUC之ArrayBlockingQueue

    java并发编程工具类JUC之ArrayBlockingQueue

    类ArrayBlockingQueue是BlockingQueue接口的实现类,它是有界的阻塞队列,内部使用数组存储队列元素,通过代码给大家说明如何初始化一个ArrayBlockingQueue,并向其中添加一个对象,对java并发编程工具类ArrayBlockingQueue相关知识感兴趣的朋友一起看看吧
    2021-05-05
  • Spring Retry重试框架的使用讲解

    Spring Retry重试框架的使用讲解

    重试的使用场景比较多,比如调用远程服务时,由于网络或者服务端响应慢导致调用超时,此时可以多重试几次。用定时任务也可以实现重试的效果,但比较麻烦,用Spring Retry的话一个注解搞定所有,感兴趣的可以了解一下
    2023-01-01
  • Java二叉树的四种遍历(递归与非递归)

    Java二叉树的四种遍历(递归与非递归)

    这篇文章小编给大家分享的是Java二叉树的四种遍历,主要是递归与非递归,下面文章加u来详细介绍,感兴趣的小伙伴一起来学习吧
    2021-10-10
  • Spring Boot DevTools 全局配置学习指南

    Spring Boot DevTools 全局配置学习指南

    这篇文章主要介绍了Spring Boot DevTools 全局配置,注意包括直接重启项目与devtools重启的区别,DevTools配置,DevTools全局配置及trigger-file控制重启行为的相关知识,需要的朋友可以参考下
    2022-03-03
  • 解决springboot自定义注解AOP在controller上导致controller注入失败问题

    解决springboot自定义注解AOP在controller上导致controller注入失败问题

    这篇文章主要介绍了解决springboot自定义注解AOP在controller上导致controller注入失败问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-10-10
  • Java的内存分配与回收策略详解

    Java的内存分配与回收策略详解

    这篇文章主要介绍了Java的内存分配与回收策略详解,对象的内存分配,就是在堆上分配,对象主要分配在新生代的 Eden 区上,少数情况下可能直接分配在老年代,分配规则不固定,取决于当前使用的垃圾收集器组合以及相关的参数配置,需要的朋友可以参考下
    2023-08-08
  • 深入了解Spring中的@Autowired和@Resource注解

    深入了解Spring中的@Autowired和@Resource注解

    Spring中的@Autowired和@Resource注解都可以实现依赖注入,但使用方式、注入策略和适用场景略有不同。本文将深入探讨这两种注解的原理、使用方法及优缺点,帮助读者更好地理解和运用Spring依赖注入机制
    2023-04-04
  • SpringBoot 2.0 整合sharding-jdbc中间件实现数据分库分表

    SpringBoot 2.0 整合sharding-jdbc中间件实现数据分库分表

    这篇文章主要介绍了SpringBoot 2.0 整合sharding-jdbc中间件,实现数据分库分表,本文图文并茂给大家介绍的非常详细,具有一定的参考借鉴价值 ,需要的朋友可以参考下
    2019-06-06
  • 四步轻松搞定java web每天定时执行任务

    四步轻松搞定java web每天定时执行任务

    本篇文章主要介绍了四步轻松搞定java web每天定时执行任务,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-01-01
  • java删除文件和文件夹具体实现

    java删除文件和文件夹具体实现

    这篇文章介绍了java删除文件和文件夹具体实现,有需要的朋友可以参考一下
    2013-10-10

最新评论