解决SpringCloudStream整合Kafka,两个通道对应同一个topic报错的情况

 更新时间:2026年05月06日 09:09:58   作者:加把劲骑士RideOn  
文章指出通道需唯一对应topic,否则会报错,因两个通道共用一个topic导致绑定失败,通过修改配置文件,为不同通道分配不同的topic解决

总结

  1. 一个通道(如:evad_input)只能唯一对应一个topic,否则会报错
  2. 消费者组则可以被多个通道共同使用

报错日志

2022-05-25 14:46:03.697 ERROR 17108 --- [ask-scheduler-1] o.s.cloud.stream.binding.BindingService  : Failed to create consumer binding; retrying in 30 seconds
。。。
org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer: 
。。。
Caused by: org.springframework.beans.factory.support.BeanDefinitionOverrideException: Invalid bean definition with name 'Evad.consumer-group-evad.errors.recoverer' defined in null。。。

问题所在

yml配置文件中定义的两个通道:evad_input和devilvan_input,却共用了一个topic:Evad,导致绑定失败。

配置文件

spring:
  application:
    name: devilvan-kafka
  cloud:
    stream:
      default-binder: kafka
      bindings:
        evad_input:
          destination: Evad
          binder: kafka
          group: consumer-group-evad
          content-type: text/plain
        evad_output:
          destination: Evad
          binder: kafka
          content-type: text/plain
        devilvan_input:
          # 一个通道只能唯一对应一个topic,否则会报binder
          destination: Evad
          binder: kafka
          # 一个消费者组可以被多个通道使用
          group: consumer-group-evad
          content-type: text/plain
        devilvan_output:
          destination: Evad
          binder: kafka
          content-type: text/plain

解决方法

新定义一个Topic:Evad05,使devilvan通道对应topic,区别于evad通道对应的topic

修改后

spring:
  application:
    name: devilvan-kafka
  cloud:
    stream:
      default-binder: kafka
      bindings:
        evad_input:
          destination: Evad
          binder: kafka
          group: consumer-group-evad
          content-type: text/plain
        evad_output:
          destination: Evad
          binder: kafka
          content-type: text/plain
        devilvan_input:
          # 一个通道只能唯一对应一个topic,否则会报binder
          destination: Evad05
          binder: kafka
          # 一个消费者组可以被多个通道使用
          group: consumer-group-evad
          content-type: text/plain
        devilvan_output:
          destination: Evad05
          binder: kafka
          content-type: text/plain

代码

1. XXXController(生产消息的控制器)

    @PostMapping("sendEvadMessage")
    public ResultMessage<String> sendEvadMessage(@RequestBody String message) {
        ResultMessage<String> resultMessage = new ResultMessage<>();
        sender.sendEvadMessage(message);
        resultMessage.setData(message);
        return resultMessage.success();
    }

    @PostMapping("sendDevilvanMessage")
    public ResultMessage<String> sendDevilvanMessage(@RequestBody String message) {
        ResultMessage<String> resultMessage = new ResultMessage<>();
        sender.sendDevilvanMessage(message);
        resultMessage.setData(message);
        return resultMessage.success();
    }

2. 自定义通道

public interface EvadChannel {
    String EVAD_INPUT = "evad_input";
    String EVAD_OUTPUT = "evad_output";
    String DEVILVAN_INPUT = "devilvan_input";
    String DEVILVAN_OUTPUT = "devilvan_output";

    /**
     * 缺省接收消息通道
     * @return channel 返回缺省信息接收通道
     */
    @Input(EVAD_INPUT)
    MessageChannel receiveEvadMessage();

    /**
     * 缺省发送消息通道
     * @return channel 返回缺省信息发送通道
     */
    @Output(EVAD_OUTPUT)
    MessageChannel sendEvadMessage();

    /**
     * 缺省接收消息通道
     * @return channel 返回缺省信息接收通道
     */
    @Input(DEVILVAN_INPUT)
    MessageChannel receiveDevilvanMessage();

    /**
     * 缺省发送消息通道
     * @return channel 返回缺省信息发送通道
     */
    @Output(DEVILVAN_OUTPUT)
    MessageChannel sendDevilvanMessage();
}

3. EvadMessageSender(通过通道发送消息)

@Slf4j
@Component
public class EvadMessageSender {
    @Autowired
    private EvadChannel channel;

    /**
     * 消息发送到默认通道:缺省通道对应缺省主题
     *
     * @param message
     */
    public void sendEvadMessage(String message) {
        channel.sendEvadMessage().send(MessageBuilder.withPayload(message).build());
    }

    /**
     * 消息发送到默认通道:缺省通道对应缺省主题
     *
     * @param message
     */
    public void sendDevilvanMessage(String message) {
        channel.sendDevilvanMessage().send(MessageBuilder.withPayload(message).build());
    }
}

4. EvadReceiveListener(订阅/消费者)

@Slf4j
@Configuration
@EnableBinding(value = EvadChannel.class)
public class EvadReceiveListener {
    @StreamListener(EvadChannel.EVAD_INPUT)
    public void receiveEvadMessage(Message<String> message) {
        log.info("{}    订阅消息:通道 = " + EvadChannel.EVAD_INPUT + ",data = {}",
                DateUtil.now(), message.getPayload());
    }

    @StreamListener(EvadChannel.DEVILVAN_INPUT)
    public void receiveDevilvanMessage(Message<String> message) {
        log.info("{}    订阅消息:通道 = " + EvadChannel.DEVILVAN_INPUT + ",data = {}",
                DateUtil.now(), message.getPayload());
    }
}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • JPA @Query时,无法使用limit函数的问题及解决

    JPA @Query时,无法使用limit函数的问题及解决

    这篇文章主要介绍了JPA @Query时,无法使用limit函数的问题及解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-03-03
  • Maven发布项目到Nexus私有服务器

    Maven发布项目到Nexus私有服务器

    本文主要介绍了Maven发布项目到Nexus私有服务器,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-07-07
  • java线程池对象ThreadPoolExecutor的深入讲解

    java线程池对象ThreadPoolExecutor的深入讲解

    在我们的开发中“池”的概念并不罕见,有数据库连接池、线程池、对象池、常量池等等。下面这篇文章主要给大家介绍了关于java线程池对象ThreadPoolExecutor的相关资料,需要的朋友可以参考借鉴,下面来一起看看吧
    2018-09-09
  • IDEA中安装和使用Lombok插件的方法

    IDEA中安装和使用Lombok插件的方法

    Lombok是一个可以通过简单的注解形式来帮助我们简化消除一些必须有但显得很臃肿的Java代码的工具,通过使用对应的注解,可以在编译源码的时候生成对应的方法,本文重点给大家介绍IDEA中安装和使用Lombok插件的方法,感兴趣的朋友一起看看吧
    2021-06-06
  • mybatis处理枚举类的简单方法

    mybatis处理枚举类的简单方法

    这篇文章主要给大家介绍了关于mybatis处理枚举类的简单方法,文中通过示例代码介绍的非常详细,对大家学习或者使用mybatis具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
    2019-05-05
  • Java9中对集合类扩展的of方法解析

    Java9中对集合类扩展的of方法解析

    这篇文章主要介绍了Java9 中对集合类扩展的of方法,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • Java基于Tabula实现PDF合并单元格内容的提取

    Java基于Tabula实现PDF合并单元格内容的提取

    这篇文章主要为大家详细介绍了Java如何基于Tabula实现PDF合并单元格内容的提取,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2025-10-10
  • Java实现插入排序实例

    Java实现插入排序实例

    这篇文章主要介绍了Java实现插入排序,实例分析了Java的插入排序原理与实现技巧,非常具有实用价值,需要的朋友可以参考下
    2015-02-02
  • java实现猜字母游戏

    java实现猜字母游戏

    这篇文章主要为大家详细介绍了java实现猜字母小游戏,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-03-03
  • idea快速搭建spring cloud注册中心与注册的方法

    idea快速搭建spring cloud注册中心与注册的方法

    这篇文章主要介绍了idea快速搭建spring cloud注册中心与注册的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-07-07

最新评论