SpringCloud中的Stream服务间消息传递详解

 更新时间:2024年01月31日 09:52:11   作者:卷不动躺不平的粥  
这篇文章主要介绍了SpringCloud中的Stream服务间消息传递详解,Stream 就是在消息队列的基础上,对其进行封装,可以是我们更方便的去使用,Stream应用由第三方的中间件组成,应用间的通信通过输入通道和输出通道完成,需要的朋友可以参考下

一、Stream 的介绍

Stream 就是在消息队列的基础上,对其进行封装,可以是我们更方便的去使用。

Spring Cloud Stream应用由第三方的中间件组成。应用间的通信通过输入通道(input channel)和输出通道(output channel)完成。这些通道是有Spring Cloud Stream 注入的。而通道与外部的代理(可以理解为上文所说的数据中心)的连接又是通过Binder实现的。

二、Stream 的快速入门

2.1 编辑消费者

2.1.1 导入相关依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2.1.2 编写配置文件

spring:
  rabbitmq:
    host: 192.168.31.138
    port: 5672
    username: test
    password: test
    virtual-host: /test

2.1.3 声明 channel(通道)

通过 @Input() 注解来指定所要声明的通道。

public interface StreamClient {
    @Input("myMessage")
    SubscribableChannel input();
}

被 @Input 和@Output 注解的方法。其中 @Input 注解的方法返回的是 SubscribableChannel ,@Output 注解的方法返回的是 MessageChannel 。

声明通道(channel)的方法就是使用 @Input 和 @Output 注解方法。你想要多少通道就注解多少方法。

默认情况下,通道的名称就是注解的方法的名称,如果需要自己指定,只需要给这两个注解传递 String 类型的参数即可。

使用@Input或者@Output注解声明了通道(channel)的接口。Spring Cloud Stream会自动实现这些接口。

2.1.4 创建和绑定 channel(通道)

使用 @EnableBinding 就能创建和绑定通道(channel)。

@SpringBootApplication
@EnableEurekaClient
@EnableBinding(StreamClient.class)
public class SearchApplication {
    public static void main(String[] args) {
        SpringApplication.run(SearchApplication.class,args);
    }
}

@EnableBinding 注解接收的参数就是使用 @Input 或者 @Output 注解声明了通道(channel)的接口。

2.1.5 消费消息

@StreamListener 接收的参数是要处理的通道(channel)的名,所注解的方法就是处理从通道获取到的数据的方法。方法的参数就是获取到的数据。

@Component
public class StreamReceiver {
    @StreamListener("myMessage")
    public void msg(Object msg){
        System.out.println("接收到消息:"+msg);
    }
}

2.2 编辑生产者

2.2.1 导入相关依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2.2.2 编写配置文件

spring:
  rabbitmq:
    host: 192.168.31.138
    port: 5672
    username: test
    password: test
    virtual-host: /test

2.2.3 声明 channel(通道)

public interface StreamClient {
    @Output("myMessage")
    MessageChannel output();
}

2.2.4 创建和绑定

@SpringBootApplication
.......
@EnableBinding(StreamClient.class)
public class CustomerApplication {
    public static void main(String[] args) {
        SpringApplication.run(CustomerApplication.class,args);
    }
    
	........
        
}

2.2.5 生产消息

@RestController
public class MessageController {

    @Autowired
    private StreamClient streamClient;

    @GetMapping("/send")
    public String send(){
        streamClient.output().send(MessageBuilder.withPayload("Hello stream!!!!!").build());
        return "消息发送成功!";
    }
}

2.3 测试

三、Stream 重复消费消息

避免一个消息被多个消费者消费,只需要将多个消费者指定为一个消费者组即可。

**消费组:**直观的理解就是一群消费者一起处理消息(每个发送到消费组的数据,仅由消费组中的一个消费者处理)。

spring:
  cloud:
    stream:
      bindings:
        myMessage: #指定channel
          group: customer #指定消费者组

四、Stream 消费者的手动 ACK

4.1 编写配置文件

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          myMessage: #指定 channel name
            consumer: 
              acknowledgeMode: MANUAL # 指定规则默认 AUTO

4.2 修改消费消息的方法

消息是带有 Header 的,类似 Http 的 headler,我们可以通过 @Header 来获取指定的 Header。

@Component
public class StreamReceiver {
    @StreamListener("myMessage")
    public void msg(Object msg,
                    @Header(name = AmqpHeaders.CHANNEL) Channel channel,
                    @Header(name = AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException {
        System.out.println("接收到消息:"+msg);
        channel.basicAck(deliveryTag,false);
    }
}

到此这篇关于SpringCloud中的Stream服务间消息传递详解的文章就介绍到这了,更多相关SpringCloud的Stream内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot实现过滤器拦截器的耗时对比

    SpringBoot实现过滤器拦截器的耗时对比

    这篇文章主要为大家详细介绍了SpringBoot实现过滤器拦截器的输出接口耗时对比,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一下
    2022-06-06
  • java实现航空用户管理系统

    java实现航空用户管理系统

    这篇文章主要为大家详细介绍了java实现航空用户管理系统,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-07-07
  • 详解Java中Thread 和Runnable区别

    详解Java中Thread 和Runnable区别

    这篇文章主要介绍了Java中Thread 和Runnable的区别,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-03-03
  • Java的ConcurrentLinkedQueue源码分析

    Java的ConcurrentLinkedQueue源码分析

    这篇文章主要介绍了Java的ConcurrentLinkedQueue源码分析,ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全的队列,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素,需要的朋友可以参考下
    2023-12-12
  • SpringCloud中的服务接口(api)

    SpringCloud中的服务接口(api)

    这篇文章主要介绍了SpringCloud中的服务接口(api),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-04-04
  • IDEA查看Scala的源码的教程图解

    IDEA查看Scala的源码的教程图解

    这篇文章主要介绍了IDEA查看Scala的源码的方法,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08
  • Spring框架 XML配置事务控制的步骤操作

    Spring框架 XML配置事务控制的步骤操作

    这篇文章主要介绍了Spring框架 XML配置事务控制的步骤操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • Java中的while循环语句详细讲解

    Java中的while循环语句详细讲解

    这篇文章主要给大家介绍了关于Java中while循环语句的相关资料,while循环是一种在编程中常见的控制流语句,它允许代码在特定条件下(通常是一个布尔表达式)重复执行一段代码,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2024-03-03
  • 更改eclipse的JDK版本详细步骤

    更改eclipse的JDK版本详细步骤

    我们用eclipse在做项目的时候会切换jdk版本,本地运行的项目所使用的jdk版本比Linux服务器高(低),需要调低(高)JDK版本,这篇文章主要给大家介绍了关于如何更改eclipse的JDK版本的相关资料,需要的朋友可以参考下
    2023-11-11
  • SpringMVC中使用Thymeleaf模板引擎实例代码

    SpringMVC中使用Thymeleaf模板引擎实例代码

    这篇文章主要介绍了SpringMVC中使用Thymeleaf模板引擎实例代码,分享了相关代码示例,小编觉得还是挺不错的,具有一定借鉴价值,需要的朋友可以参考下
    2018-02-02

最新评论