Spring Boot集群管理工具KafkaAdminClient使用方法解析

 更新时间:2020年02月25日 10:25:56   作者:---WeiGeH  
这篇文章主要介绍了Spring Boot集群管理工具KafkaAdminClient使用方法解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

原理介绍

在Kafka官网中这么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具体的KafkaAdminClient包含了一下几种功能(以Kafka1.0.0版本为准):

  • 创建Topic:createTopics(Collection<NewTopic> newTopics)
  • 删除Topic:deleteTopics(Collection<String> topics)
  • 罗列所有Topic:listTopics()
  • 查询Topic:describeTopics(Collection<String> topicNames)
  • 查询集群信息:describeCluster()
  • 查询ACL信息:describeAcls(AclBindingFilter filter)
  • 创建ACL信息:createAcls(Collection<AclBinding> acls)
  • 删除ACL信息:deleteAcls(Collection<AclBindingFilter> filters)
  • 查询配置信息:describeConfigs(Collection<ConfigResource> resources)
  • 修改配置信息:alterConfigs(Map<ConfigResource, Config> configs)
  • 修改副本的日志目录:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)
  • 查询节点的日志目录信息:describeLogDirs(Collection<Integer> brokers)
  • 查询副本的日志目录信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas)
  • 增加分区:createPartitions(Map<String, NewPartitions> newPartitions)

其内部原理是使用Kafka自定义的一套二进制协议来实现,详细可以参见Kafka协议。主要实现步骤:

客户端根据方法的调用创建相应的协议请求,比如创建Topic的createTopics方法,其内部就是发送CreateTopicRequest请求。
客户端发送请求至Kafka Broker。

Kafka Broker处理相应的请求并回执,比如与CreateTopicRequest对应的是CreateTopicResponse。
客户端接收相应的回执并进行解析处理。

和协议有关的请求和回执的类基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是这些请求和回执类的两个基本父类。

代码如下

@Component
public class KafkaConfig{

   // 配置Kafka
  public Properties getProps(){
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
/*    props.put("retries", 2); // 重试次数
    props.put("batch.size", 16384); // 批量发送大小
    props.put("buffer.memory", 33554432); // 缓存大小,根据本机内存大小配置
    props.put("linger.ms", 1000); // 发送频率,满足任务一个条件发送*/
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    return props;
  }

}
@RestController
public class KafkaTopicManager {

  @Autowired
  private KafkaConfig kafkaConfig;

  @GetMapping("createTopic")
  public void createTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());

    NewTopic newTopic = new NewTopic("test1",4, (short) 1);
    Collection<NewTopic> newTopicList = new ArrayList<>();
    newTopicList.add(newTopic);
    adminClient.createTopics(newTopicList);

    adminClient.close();
  }
  @GetMapping("deleteTopic")
  public void deleteTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
    adminClient.deleteTopics(Arrays.asList("test1"));
    adminClient.close();
  }
  @GetMapping("listAllTopic")
  public void listAllTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
    ListTopicsResult result = adminClient.listTopics();
    KafkaFuture<Set<String>> names = result.names();
    try {
      names.get().forEach((k)->{
        System.out.println(k);
      });
    } catch (InterruptedException | ExecutionException e) {
      e.printStackTrace();
    }
    adminClient.close();
  }
  @GetMapping("getTopic")
  public void getTopic(){
    AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());

    DescribeTopicsResult describeTopics = adminClient.describeTopics(Arrays.asList("syn-test"));

    Collection<KafkaFuture<TopicDescription>> values = describeTopics.values().values();

    if(values.isEmpty()){
      System.out.println("找不到描述信息");
    }else{
      for (KafkaFuture<TopicDescription> value : values) {
        System.out.println(value);
      }
    }
    adminClient.close();
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • java使用RSA加密方式实现数据加密解密的代码

    java使用RSA加密方式实现数据加密解密的代码

    这篇文章给大家分享java使用RSA加密方式实现数据加密解密,通过实例代码文字相结合给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友参考下
    2019-11-11
  • 通过反射实现Java下的委托机制代码详解

    通过反射实现Java下的委托机制代码详解

    这篇文章主要介绍了通过反射实现Java下的委托机制代码详解,具有一定借鉴价值,需要的朋友可以参考下。
    2017-12-12
  • MyBatis生成UUID的实现

    MyBatis生成UUID的实现

    这篇文章主要介绍了MyBatis生成UUID的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-12-12
  • JAVA使用ElasticSearch查询in和not in的实现方式

    JAVA使用ElasticSearch查询in和not in的实现方式

    今天小编就为大家分享一篇关于JAVA使用Elasticsearch查询in和not in的实现方式,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2018-12-12
  • 一篇文章教你如何用多种迭代写法实现二叉树遍历

    一篇文章教你如何用多种迭代写法实现二叉树遍历

    这篇文章主要介绍了C语言实现二叉树遍历的迭代算法,包括二叉树的中序遍历、先序遍历及后序遍历等,是非常经典的算法,需要的朋友可以参考下
    2021-08-08
  • MybatisPlus查询条件为空字符串或null问题及解决

    MybatisPlus查询条件为空字符串或null问题及解决

    这篇文章主要介绍了MybatisPlus查询条件为空字符串或null问题及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-06-06
  • java判断是否空最简单的方法

    java判断是否空最简单的方法

    在本篇文章里小编给大家整理的一篇关于java判断是否空最简单的方法,有兴趣的读者们可以参考下。
    2019-12-12
  • ShardingSphere数据分片算法及测试实战

    ShardingSphere数据分片算法及测试实战

    这篇文章主要为大家介绍了ShardingSphere数据分片算法及测试实战示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-03-03
  • SpringCloud Hystrix-Dashboard仪表盘的实现

    SpringCloud Hystrix-Dashboard仪表盘的实现

    这篇文章主要介绍了SpringCloud Hystrix-Dashboard仪表盘的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-08-08
  • 深度理解SpringMVC中的HandlerMapping

    深度理解SpringMVC中的HandlerMapping

    这篇文章主要介绍了深度理解SpringMVC中的HandlerMapping,HandlerMapping的作用根据request找到对应的处理器Handler,在HandlerMapping接口中有一个唯一的方法getHanler,需要的朋友可以参考下
    2023-09-09

最新评论