springboot项目配置多个kafka的示例代码

 更新时间:2023年04月24日 14:44:41   作者:CccccDi  
这篇文章主要介绍了springboot项目配置多个kafka,本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

1.spring-kafka

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>1.3.5.RELEASE</version>
</dependency>

2.配置文件相关信息

kafka.bootstrap-servers=localhost:9092
kafka.consumer.group.id=20230321
#可以并发消费的线程数 (通常与partition数量一致)
kafka.consumer.concurrency=10
kafka.consumer.enable.auto.commit=false
        
kafka.bootstrap-servers.pic=localhost:29092
kafka.consumer.group.id.pic=20230322_pic
kafka.consumer.concurrency.pic=10
kafka.consumer.enable.auto.commit.pic=false

3.kafka配置类

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

    @Value("${kafka.consumer.group.id}")
    private String groupId;

    @Value("${kafka.consumer.concurrency}")
    private int concurrency;

    @Value("${kafka.consumer.enable.auto.commit}")
    private String autoCommit;

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServer;


    @Value("${kafka.consumer.group.id.pic}")
    private String groupIdPic;

    @Value("${kafka.consumer.concurrency.pic}")
    private int concurrencyPic;

    @Value("${kafka.consumer.enable.auto.commit.pic}")
    private String autoCommitPic;

    @Value("${kafka.bootstrap-servers.pic}")
    private String bootstrapServerPic;


    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        String bootstrapServers = bootstrapServer;
        Map<String, Object> configProps = new HashMap<>(16);
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }

 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }




    @Bean
    public ConsumerFactory<String, String> consumerFactoryPic() {
        String bootstrapServers = bootstrapServerPic;
        Map<String, Object> configProps = new HashMap<>(16);
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPic);
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitPic);
        return new DefaultKafkaConsumerFactory<>(configProps);
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryPic() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryPic());
        factory.setConcurrency(concurrencyPic);
        factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

4.消费主题消息

@KafkaListener(topics = "xxxxx", containerFactory = "kafkaListenerContainerFactoryPic")
    public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
        try {
            String jsonString = message.value();
            if (StringUtils.isNoneBlank(jsonString)) {
                log.info("消费:{}",jsonString);
                //TODO ....
            }
        } catch (Exception e) {
            log.error(" receive topic error ", e);
        } finally {
            ack.acknowledge();
        }
    }

@KafkaListener(topics = "xxxxxx", containerFactory = "kafkaListenerContainerFactory")
    public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
        try {
            if (StringUtils.isNoneBlank(message.value())) {
                  //TODO ....
            }
        } catch (Exception e) {
            logger.error(" receive topic error ", e);
        } finally {
            ack.acknowledge();
        }
    }

到此这篇关于springboot项目配置多个kafka的文章就介绍到这了,更多相关springboot配置多个kafka内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java 获取原始请求域名实现示例

    Java 获取原始请求域名实现示例

    这篇文章主要为大家介绍了Java 获取原始请求域名实现示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-12-12
  • Jenkins发送测试报告邮件过程详解

    Jenkins发送测试报告邮件过程详解

    这篇文章主要介绍了Jenkins发送测试报告邮件过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-07-07
  • 基于OpenCv与JVM实现加载保存图像功能(JAVA 图像处理)

    基于OpenCv与JVM实现加载保存图像功能(JAVA 图像处理)

    openCv有一个名imread的简单函数,用于从文件中读取图像,本文给大家介绍JAVA 图像处理基于OpenCv与JVM实现加载保存图像功能,感兴趣的朋友一起看看吧
    2022-01-01
  • 查找jdk安装路径并且切换多版本jdk的详细步骤

    查找jdk安装路径并且切换多版本jdk的详细步骤

    在日常的工作学习中可能需要用到不同版本的jdk,下面这篇文章主要给大家介绍了关于查找jdk安装路径并且切换多版本jdk的详细步骤,文中介绍的非常详细,需要的朋友可以参考下
    2024-01-01
  • java实现秒表功能

    java实现秒表功能

    这篇文章主要为大家详细介绍了java实现秒表功能,利用javax.swing.Timer类设计实现秒表应用程序,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-09-09
  • Java面试synchronized偏向锁后hashcode存址

    Java面试synchronized偏向锁后hashcode存址

    这篇文章主要为大家介绍了Java面试中synchronized偏向锁后hashcode存址详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-05-05
  • Java集合之LinkedHashSet类详解

    Java集合之LinkedHashSet类详解

    这篇文章主要介绍了Java集合之LinkedHashSet类详解,LinkedHashSet 是 Java 中的一个集合类,它是 HashSet 的子类,并实现了 Set 接口,与 HashSet 不同的是,LinkedHashSet 保留了元素插入的顺序,并且具有 HashSet 的快速查找特性,需要的朋友可以参考下
    2023-09-09
  • MyBatis-Plus雪花算法实现源码解读

    MyBatis-Plus雪花算法实现源码解读

    雪花算法是一种用于生成唯一标识符(ID)的分布式算法,雪花算法的设计目标是在分布式系统中生成全局唯一的ID,同时保证ID的有序性和趋势递增,这篇文章主要介绍了MyBatis-Plus雪花算法实现源码解析,需要的朋友可以参考下
    2023-12-12
  • JAVA中数组从小到大排序的2种方法实例

    JAVA中数组从小到大排序的2种方法实例

    JAVA中在运用数组进行排序功能时一般有多种解决方案,下面这篇文章主要给大家介绍了关于JAVA中数组从小到大排序的2种方法,文中都给出了详细的实例代码,需要的朋友可以参考下
    2023-03-03
  • 深入了解Spring中最常用的11个扩展点

    深入了解Spring中最常用的11个扩展点

    我们一说到spring,可能第一个想到的是 IOC(控制反转) 和 AOP(面向切面编程)。除此之外,我们在使用spring的过程中,有没有发现它的扩展能力非常强。今天就来跟大家一起聊聊,在Spring中最常用的11个扩展点
    2022-09-09

最新评论