解决SpringCloudStream整合Kafka,两个通道对应同一个topic报错的情况

 更新时间:2026年05月06日 09:09:58   作者:加把劲骑士RideOn  
文章指出通道需唯一对应topic,否则会报错,因两个通道共用一个topic导致绑定失败,通过修改配置文件,为不同通道分配不同的topic解决

总结

  1. 一个通道(如:evad_input)只能唯一对应一个topic,否则会报错
  2. 消费者组则可以被多个通道共同使用

报错日志

2022-05-25 14:46:03.697 ERROR 17108 --- [ask-scheduler-1] o.s.cloud.stream.binding.BindingService  : Failed to create consumer binding; retrying in 30 seconds
。。。
org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer: 
。。。
Caused by: org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'Evad.consumer-group-evad.errors.recoverer' defined in null。。。

问题所在

yml配置文件中定义的两个通道:evad_input和devilvan_input,却共用了一个topic:Evad,导致绑定失败。

配置文件

spring:
  application:
    name: devilvan-kafka
  cloud:
    stream:
      default-binder: kafka
      bindings:
        evad_input:
          destination: Evad
          binder: kafka
          group: consumer-group-evad
          content-type: text/plain
        evad_output:
          destination: Evad
          binder: kafka
          content-type: text/plain
        devilvan_input:
          # 一个通道只能唯一对应一个topic,否则会报binder
          destination: Evad
          binder: kafka
          # 一个消费者组可以被多个通道使用
          group: consumer-group-evad
          content-type: text/plain
        devilvan_output:
          destination: Evad
          binder: kafka
          content-type: text/plain

解决方法

新定义一个Topic:Evad05,使devilvan通道对应topic,区别于evad通道对应的topic

修改后

spring:
  application:
    name: devilvan-kafka
  cloud:
    stream:
      default-binder: kafka
      bindings:
        evad_input:
          destination: Evad
          binder: kafka
          group: consumer-group-evad
          content-type: text/plain
        evad_output:
          destination: Evad
          binder: kafka
          content-type: text/plain
        devilvan_input:
          # 一个通道只能唯一对应一个topic,否则会报binder
          destination: Evad05
          binder: kafka
          # 一个消费者组可以被多个通道使用
          group: consumer-group-evad
          content-type: text/plain
        devilvan_output:
          destination: Evad05
          binder: kafka
          content-type: text/plain

代码

1. XXXController(生产消息的控制器)

    @PostMapping("sendEvadMessage")
    public ResultMessage<String> sendEvadMessage(@RequestBody String message) {
        ResultMessage<String> resultMessage = new ResultMessage<>();
        sender.sendEvadMessage(message);
        resultMessage.setData(message);
        return resultMessage.success();
    }

    @PostMapping("sendDevilvanMessage")
    public ResultMessage<String> sendDevilvanMessage(@RequestBody String message) {
        ResultMessage<String> resultMessage = new ResultMessage<>();
        sender.sendDevilvanMessage(message);
        resultMessage.setData(message);
        return resultMessage.success();
    }

2. 自定义通道

public interface EvadChannel {
    String EVAD_INPUT = "evad_input";
    String EVAD_OUTPUT = "evad_output";
    String DEVILVAN_INPUT = "devilvan_input";
    String DEVILVAN_OUTPUT = "devilvan_output";

    /**
     * 缺省接收消息通道
     * @return channel 返回缺省信息接收通道
     */
    @Input(EVAD_INPUT)
    MessageChannel receiveEvadMessage();

    /**
     * 缺省发送消息通道
     * @return channel 返回缺省信息发送通道
     */
    @Output(EVAD_OUTPUT)
    MessageChannel sendEvadMessage();

    /**
     * 缺省接收消息通道
     * @return channel 返回缺省信息接收通道
     */
    @Input(DEVILVAN_INPUT)
    MessageChannel receiveDevilvanMessage();

    /**
     * 缺省发送消息通道
     * @return channel 返回缺省信息发送通道
     */
    @Output(DEVILVAN_OUTPUT)
    MessageChannel sendDevilvanMessage();
}

3. EvadMessageSender(通过通道发送消息)

@Slf4j
@Component
public class EvadMessageSender {
    @Autowired
    private EvadChannel channel;

    /**
     * 消息发送到默认通道:缺省通道对应缺省主题
     *
     * @param message
     */
    public void sendEvadMessage(String message) {
        channel.sendEvadMessage().send(MessageBuilder.withPayload(message).build());
    }

    /**
     * 消息发送到默认通道:缺省通道对应缺省主题
     *
     * @param message
     */
    public void sendDevilvanMessage(String message) {
        channel.sendDevilvanMessage().send(MessageBuilder.withPayload(message).build());
    }
}

4. EvadReceiveListener(订阅/消费者)

@Slf4j
@Configuration
@EnableBinding(value = EvadChannel.class)
public class EvadReceiveListener {
    @StreamListener(EvadChannel.EVAD_INPUT)
    public void receiveEvadMessage(Message<String> message) {
        log.info("{}    订阅消息:通道 = " + EvadChannel.EVAD_INPUT + ",data = {}",
                DateUtil.now(), message.getPayload());
    }

    @StreamListener(EvadChannel.DEVILVAN_INPUT)
    public void receiveDevilvanMessage(Message<String> message) {
        log.info("{}    订阅消息:通道 = " + EvadChannel.DEVILVAN_INPUT + ",data = {}",
                DateUtil.now(), message.getPayload());
    }
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • 什么是RESTful API,有什么作用

    什么是RESTful API,有什么作用

    提到RESTful API大家势必或多或少听说过,但是什么是RESTful API ?如何理解RESTful API 呢?今天咱们就来聊聊这个RESTful API
    2023-11-11
  • springboot中项目启动时实现初始化方法加载参数

    springboot中项目启动时实现初始化方法加载参数

    这篇文章主要介绍了springboot中项目启动时实现初始化方法加载参数,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12
  • 详解SpringBoot中添加@ResponseBody注解会发生什么

    详解SpringBoot中添加@ResponseBody注解会发生什么

    这篇文章主要介绍了详解SpringBoot中添加@ResponseBody注解会发生什么,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-11-11
  • Java如何把int类型转换成byte

    Java如何把int类型转换成byte

    这篇文章主要介绍了Java如何把int类型转换成byte,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-02-02
  • SpringBoot中时间格式化的五种方法汇总

    SpringBoot中时间格式化的五种方法汇总

    时间格式化在项目中使用频率是非常高的,这篇文章主要给大家介绍了关于SpringBoot中时间格式化的五种方法,文中通过示例代码介绍的非常详细,需要的朋友可以参考下
    2021-07-07
  • Springboot集成spring data elasticsearch过程详解

    Springboot集成spring data elasticsearch过程详解

    这篇文章主要介绍了springboot集成spring data elasticsearch过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-04-04
  • Java 中很好用的数据结构(你绝对没用过)

    Java 中很好用的数据结构(你绝对没用过)

    今天跟大家介绍的就是 java.util.EnumMap,也是 java.util 包下面的一个集合类,同样的也有对应的的 java.util.EnumSet,对java数据结构相关知识感兴趣的朋友一起看看吧
    2022-05-05
  • SpringBoot运用AOP来实现分布式锁的示例代码

    SpringBoot运用AOP来实现分布式锁的示例代码

    本文主要介绍了通过注解和AOP实现分布式锁的方案,包含锁过期时间、等待超时设置及自动续约功能,利用定时任务监控锁状态并延长有效期,感兴趣的可以了解一下
    2025-09-09
  • java中@NotBlank限制属性不能为空

    java中@NotBlank限制属性不能为空

    在实体类的对应属性上添 @NotBlank注解,可以实现对空置的限制,本文就来介绍一下java中@NotBlank限制属性不能为空,感兴趣的可以了解一下
    2024-01-01
  • 关于@DS注解切换数据源失败的原因实战记录

    关于@DS注解切换数据源失败的原因实战记录

    项目配置了多个数据源,需要使用@DS注解来切换数据源,但是却遇到了问题,下面这篇文章主要给大家介绍了关于@DS注解切换数据源失败原因的相关资料,需要的朋友可以参考下
    2023-05-05

最新评论