springboot如何开启和关闭kafka消费

 更新时间:2024年12月16日 09:44:17   作者:阿拉的梦想  
在Kafka消费者中,通过关闭自动消费配置,使用自定义容器工厂,并在消费监听器上设置id,可以手动控制消费的开启和关闭,这是根据个人经验总结的方法,旨在帮助其他开发者

springboot开启和关闭kafka消费

关闭kafka自动消费

配置自定义容器工厂

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.stereotype.Component;

@Component
@Configuration
public class kafkaConfig {

    @Autowired
    private ConsumerFactory<String, String> consumerFactory;

    @Bean("pingKafkaFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<String, String>();
        container.setConsumerFactory(consumerFactory);
        //禁止自动启动
        container.setAutoStartup(false);
        return container;
    }
}

在消费监听器上使用工厂,并设置id

@KafkaListener(topics = "#{pingProperties.getTopic().split(',')}",id = "pingConsumer",containerFactory = "pingKafkaFactory")

这样,启动项目后,就不会自动消费了。

手动开启和关闭消费

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Service;

/**
 * Kafka消费监听服务实现类.
 */
@Service
@Slf4j
public class KafkaConsumerListenerServiceImpl implements KafkaConsumerListenerService {


    /**
     * registry.
     */
    @Autowired
    private KafkaListenerEndpointRegistry registry;

    /**
     * 开启监听.
     *
     * @param listenerId 监听ID
     */
    @Override
    public void startListener(String listenerId) {
        //判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer(listenerId).isRunning()) {
            registry.getListenerContainer(listenerId).start();
        }
        //项目启动的时候监听容器是未启动状态,而resume是恢复的意思不是启动的意思
        //registry.getListenerContainer(listenerId).stop();
        log.info(listenerId + "开启监听成功。");
    }

    /**
     * 停止监听.
     *
     * @param listenerId 监听ID
     */
    @Override
    public void stopListener(String listenerId) {
        registry.getListenerContainer(listenerId).stop();
        log.info(listenerId + "停止监听成功。");
    }

}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • springboot执行延时任务之DelayQueue实例

    springboot执行延时任务之DelayQueue实例

    这篇文章主要介绍了springboot执行延时任务之DelayQueue实例,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-02-02
  • Java 5亿整数大文件怎么排序

    Java 5亿整数大文件怎么排序

    这篇文章主要介绍了Java 5亿整数大文件怎么排序,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-03-03
  • java 字符串词频统计实例代码

    java 字符串词频统计实例代码

    java 字符串词频统计实例代码,需要的朋友可以参考一下
    2013-03-03
  • hibernate增删改查操作代码

    hibernate增删改查操作代码

    这篇文章主要介绍了hibernate增删改查操作代码,需要的朋友可以参考下
    2017-09-09
  • SpringBoot中的事务配置管理详解

    SpringBoot中的事务配置管理详解

    这篇文章主要介绍了SpringBoot中的事务配置管理详解,Spring Boot 默认的事务规则是遇到运行异常(RuntimeException)和程序错误(Error)才会回滚,但是抛出SQLException就无法回滚了,需要的朋友可以参考下
    2024-01-01
  • 使用Java实现价格加密与优化功能

    使用Java实现价格加密与优化功能

    在现代软件开发中,数据加密是一个非常重要的环节,尤其是在处理敏感信息(如价格、用户数据等)时,本文将详细介绍如何使用 Java 实现价格加密,并对代码进行优化,需要的朋友可以参考下
    2025-01-01
  • mybatis-plus返回map自动转驼峰配置操作

    mybatis-plus返回map自动转驼峰配置操作

    这篇文章主要介绍了mybatis-plus返回map自动转驼峰配置操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-11-11
  • java中&和&&的基本区别与常见的误用

    java中&和&&的基本区别与常见的误用

    & 和&&是Java中用于逻辑运算的两个运算符,&是按位与和逻辑与兼用的运算符,而&&仅用于逻辑与运算,并具有短路特性,这篇文章主要介绍了java中&和&&的基本区别与常见的误用,需要的朋友可以参考下
    2025-02-02
  • Java 超详细讲解抽象类与接口的使用

    Java 超详细讲解抽象类与接口的使用

    对于面向对象编程来说,抽象是它的一大特征之一,在 Java 中可以通过两种形式来体现OOP的抽象:接口和抽象类,下面这篇文章主要给大家介绍了关于Java入门基础之抽象类与接口的相关资料,需要的朋友可以参考下
    2022-04-04
  • 基于log4j2.properties踩坑与填坑

    基于log4j2.properties踩坑与填坑

    这篇文章主要介绍了log4j2.properties踩坑与填坑方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12

最新评论