springboot如何配置多kafka

 更新时间:2024年01月30日 16:44:57   作者:是菜菜的小严惜哎  
这篇文章主要介绍了springboot如何配置多kafka问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

springboot配置多kafka

kafka,说起这个玩意,大家应该都不陌生,不知道啥是kafka的直接去搜索

就像MySQL的使用一样,我们在用kafka的时候,也会碰到需要使用多个kafka的情况,比如我从kafkaA的一个topic里消费出数据,进行一次处理,然后我要写入到kafkaB的topic里

从网上找了很多配置多kafka的教程,但是都不大好使,后来还是找到了,加上我自己改了点点东西,贴出来和大家分享一下~

首先是pom文件,kafka的依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

配置文件

spring:
  kafka:
    listener:
      concurrency: 10
    one:
      # kafka地址
      bootstrap-servers: 192.168.217.117:9092
      producer:
        # 生产者每次发送消息的时间间隔(毫秒)
        linger-ms: 5000
        # 单条消息最大值(字节)
        max-request-size: 16384
        # 每次批量发送消息的数量
        batch-size: 16384
        # 缓存区大小
        buffer-memory: 33554432
        # 队列
        topic: test
      consumer:
        # 是否自动提交
        enable-auto-commit: true
        # 队列
        topic: topic
        # group ID
        group-id: consumer
    two:
      # kafka地址
      bootstrap-servers: 192.168.217.160:9092
      producer:
        # 生产者每次发送消息的时间间隔(毫秒)
        linger-ms: 100
        # 单条消息最大值(单位为字节,且大小指的是经过序列化后的大小)
        max-request-size: 16384
        # 每次批量发送消息的数量
        batch-size: 16384
        # 缓存区大小
        buffer-memory: 33554432
        # 队列
        topic: test
      consumer:
        # 是否自动提交
        enable-auto-commit: true
        # 队列
        topic: test
        # group ID
        group-id: consumer

有了配置文件当然就要读取配置文件了

先读取第一个kafka

@EnableKafka
@Configuration
public class KafkaConfigOne {
 
    @Value("${spring.kafka.one.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.one.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${spring.kafka.one.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.one.producer.linger-ms}")
    private Integer lingerMs;
    @Value("${spring.kafka.one.producer.max-request-size}")
    private Integer maxRequestSize;
    @Value("${spring.kafka.one.producer.batch-size}")
    private Integer batchSize;
    @Value("${spring.kafka.one.producer.buffer-memory}")
    private Integer bufferMemory;
 
 
    @Bean
    public KafkaTemplate<String, String> kafkaOneTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
 
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaOneContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
 
    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
 
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
 
    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
 
    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}
 

接着读取第二个kafka

@EnableKafka
@Configuration
public class KafkaConfigTwo {
 
    @Value("${spring.kafka.two.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.two.consumer.enable-auto-commit}")
    private boolean enableAutoCommit;
    @Value("${spring.kafka.two.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.two.producer.linger-ms}")
    private Integer lingerMs;
    @Value("${spring.kafka.two.producer.max-request-size}")
    private Integer maxRequestSize;
    @Value("${spring.kafka.two.producer.batch-size}")
    private Integer batchSize;
    @Value("${spring.kafka.two.producer.buffer-memory}")
    private Integer bufferMemory;
 
 
    @Bean
    public KafkaTemplate<String, String> kafkaTwoTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
 
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTwoContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
 
    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
 
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
 
    private Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
 
    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}
 

PS:路径什么的大家可以改哈

使用:

     @Autowired
    @Qualifier("kafkaTwoTemplate")
    private KafkaTemplate kafkaTwoTemplate;
 
kafkaTwoTemplate.send("topic", "message");

直接注入使用就可以,但是,在这儿注意,因为配置了多个kafka,所以需要进行区分,此处我使用@Autowired和@Qualifier连用,大家也可以使用@DataSource,这个就是多数据源的注解而已,无所谓,按照个人习惯进行使用就OK

当然,还有消费者

 @KafkaListener(topics = {"#{'${spring.kafka.two.consumer.topic}'}"}, containerFactory = "kafkaTwoContainerFactory")
    public void listenerTwo (String data) {
        System.out.println(data);
    }

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • 聊聊Spring Boot 如何集成多个 Kafka

    聊聊Spring Boot 如何集成多个 Kafka

    这篇文章主要介绍了Spring Boot 集成多个 Kafka的相关资料,包括配置文件,生成者和消费者配置过程,本文通过实例代码给大家介绍的非常详细,需要的朋友参考下吧
    2023-10-10
  • java定义二维数组的几种写法(小结)

    java定义二维数组的几种写法(小结)

    下面小编就为大家带来一篇java定义二维数组的几种写法(小结)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-10-10
  • Kotlin null的处理详解

    Kotlin null的处理详解

    这篇文章主要介绍了Kotlin null的处理详解的相关资料,需要的朋友可以参考下
    2017-06-06
  • Java8 Stream流根据多个字段去重

    Java8 Stream流根据多个字段去重

    这篇文章主要介绍了Java8 Stream流根据多个字段去重,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-05-05
  • Spring中@ConditionalOnProperty注解的作用详解

    Spring中@ConditionalOnProperty注解的作用详解

    这篇文章主要介绍了Spring中@ConditionalOnProperty注解的作用详解,@ConditionalOnProperty注解主要是用来判断配置文件中的内容来决定配置类是否生效用的,如果条件不匹配,则配置类不生效,需要的朋友可以参考下
    2024-01-01
  • Java中大数据推荐算法使用场景分析

    Java中大数据推荐算法使用场景分析

    在Java中实现大数据推荐算法时,通常会使用一些开源的机器学习库,如Apache Mahout、Weka、DL4J(DeepLearning4j,用于深度学习)或者Spark MLlib(用于在Spark集群上运行),这篇文章主要介绍了Java中可以用的大数据推荐算法,需要的朋友可以参考下
    2024-06-06
  • java实现简单的加减乘除计算器

    java实现简单的加减乘除计算器

    这篇文章主要为大家详细介绍了java实现简单的加减乘除计算器,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-09-09
  • SpringBoot整合EasyExcel实现导入导出数据

    SpringBoot整合EasyExcel实现导入导出数据

    这篇文章主要为大家详细介绍了如何使用Vue、SpringBoot和EasyExcel实现导入导出数据功能,感兴趣的小伙伴可以跟随小编一起学习一下
    2022-05-05
  • 一分钟入门Java Spring Boot彻底解决SSM配置问题

    一分钟入门Java Spring Boot彻底解决SSM配置问题

    Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置。通过这种方式,Spring Boot致力于在蓬勃发展的快速应用开发领域成为领导者
    2021-10-10
  • Spring Data Jpa返回自定义对象的3种方法实例

    Spring Data Jpa返回自定义对象的3种方法实例

    在使用Spring Data Jpa框架时,根据业务需求我们通常需要进行复杂的数据库查询,下面这篇文章主要给大家介绍了关于Spring Data Jpa返回自定义对象的3种方法,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-08-08

最新评论