spring-cloud-stream结合kafka使用详解

 更新时间:2020年08月19日 10:57:16   作者:KyleYaoKeepGoing  
这篇文章主要介绍了spring-cloud-stream结合kafka使用详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

1.pom文件导入依赖

<!-- kafka -->
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

2.application.yml文件配置

spring: 
 cloud:
  stream:
   kafka:
    binder:
     brokers: xxx.xxx.xxx.xx:xxxx // Kafka的消息中间件服务器地址
   bindings:
    xxx_output: // 通道名称 
     destination: xxx // 消息发往的目的地,对应topic 在发送消息的配置里面,group是不用配置的 
     // 如果我们需要传输json的信息,那么在发送消息端需要设置content-type为json(其实可以不写,默认content-type就是json)
    xxx_input:
     destination: xxx // 消息发往的目的地,对应topic
     group: xxx // 对应kafka的group

3.创建消息发送者

@EnableBinding(Source.class) // @EnableBinding 是绑定通道的,Soure.class是spring 提供的,表示这是一个可绑定的发布通道
@Service
public class MqService {

  @Resource(name = KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
  private MessageChannel oesWorkbenchChannel;

  /**
   * 发送一条kafka消息
   */
  public boolean sendLifeData(Object object) {
    return MqUtils.send(oesWorkbenchChannel, object, KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT);
  }
}

// 发布通道
public interface Source {
  @Output(KafkaConstants.OES_WORKBENCH_LIFE_DATA_OUTPUT)
  MessageChannel oesWorkbenchLifeDataOutput(); // 发布通道用MessageChannel 
}

4.创建消息监听者

@Slf4j
@EnableBinding(Sink.class)
public class WorkbenchStreamListener {

  @Resource
  private FileService fileService;

  @StreamListener(KafkaConstants.xxx_input) // 监听接受通道
  public void receiveData(MoveMessage moveMessage) {
  }
}

// 接受通道
public interface Sink {
  @Input(KafkaConstants.OES_WORKBENCH_MOVE_INPUT)
  SubscribableChannel oesWorkbenchMoveInput(); // 接受通道用SubscribableChannel 
}

接下来就可以愉快的发送监听消息了

到此这篇关于spring-cloud-stream结合kafka使用详解的文章就介绍到这了,更多相关spring-cloud-stream整合kafka内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring Cloud入门教程之Zuul实现API网关与请求过滤

    Spring Cloud入门教程之Zuul实现API网关与请求过滤

    这篇文章主要给大家介绍了关于Spring Cloud入门教程之Zuul实现API网关与请求过滤的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧。
    2018-05-05
  • SpringBoot 启动流程追踪方法分享

    SpringBoot 启动流程追踪方法分享

    这篇文章主要介绍了SpringBoot 启动流程追踪方法分享的相关资料,需要的朋友可以参考下
    2023-08-08
  • java处理解析带有反斜杠的json

    java处理解析带有反斜杠的json

    在Java中操作JSON数据是一项常见的任务,其中一个常见的问题是如何在JSON字符串中包含反斜杠,本文主要介绍了java处理解析带有反斜杠的json,感兴趣的可以了解一下
    2024-01-01
  • 浅谈JVM系列之JIT中的Virtual Call

    浅谈JVM系列之JIT中的Virtual Call

    什么是Virtual Call?Virtual Call在java中的实现是怎么样的?Virtual Call在JIT中有没有优化?所有的答案看完这篇文章就明白了。
    2021-06-06
  • 一场由Java中Integer引发的踩坑实战

    一场由Java中Integer引发的踩坑实战

    Java中的数据类型分为基本数据类型和复杂数据类型int是前者而integer是后者(也就是一个类),下面这篇文章主要给大家介绍了关于由Java中Integer引发的踩坑实战,需要的朋友可以参考下
    2022-11-11
  • springboot整合rocketmq实现分布式事务

    springboot整合rocketmq实现分布式事务

    大多数情况下很多公司是使用消息队列的方式实现分布式事务。 本篇文章重点讲解springboot环境下整合rocketmq实现分布式事务,感兴趣的可以了解一下
    2021-05-05
  • 使用Get方式提交数据到Tomcat服务器的方法

    使用Get方式提交数据到Tomcat服务器的方法

    这篇文章将介绍向服务器发送数据,并且服务器将数据的处理结果返回给客户端,本文给大家介绍使用Get方式向服务器发送数据,感兴趣的朋友一起学习吧
    2016-04-04
  • Java 基础之内部类详解及实例

    Java 基础之内部类详解及实例

    这篇文章主要介绍了 Java 基础之内部类详解及实例的相关资料,需要的朋友可以参考下
    2017-03-03
  • ThreadPoolExecutor核心线程数和RocketMQ消费线程调整详解

    ThreadPoolExecutor核心线程数和RocketMQ消费线程调整详解

    这篇文章主要介绍了ThreadPoolExecutor核心线程数和RocketMQ消费线程调整详解,Rocketmq 消费者在高峰期希望手动减少消费线程数,通过DefaultMQPushConsumer.updateCorePoolSize方法可以调用内部的setCorePoolSize设置多线程核心线程数,需要的朋友可以参考下
    2023-10-10
  • 一文带你搞懂Java中方法重写与方法重载的区别

    一文带你搞懂Java中方法重写与方法重载的区别

    这篇文章主要介绍了Java中方法重写与方法重载有哪些区别,文中有详细的代码示例,对大家的学习或工作有一定的帮助,需要的朋友可以参考下
    2023-05-05

最新评论