RabbitMQ Stream插件使用案例代码

 更新时间:2024年04月17日 11:49:49   作者:Doker技术人的品牌  
这篇文章主要介绍了RabbitMQ Stream插件使用案例代码,2.4版为RabbitMQ流插件引入了对RabbitMQStream插件Java客户端的初始支持,需要的朋友可以参考下

2.4版为RabbitMQ流插件引入了对RabbitMQStream插件Java客户端的初始支持。

  • RabbitStreamTemplate
  • StreamListener容器

将spring rabbit流依赖项添加到项目中:

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit-stream</artifactId>
  <version>3.1.4</version>
</dependency>

您可以使用RabbitAdmin bean,使用QueueBuilder.stream()方法指定队列类型,正常地配置队列。例如:

@Bean
Queue stream() {
    return QueueBuilder.durable("stream.queue1")
            .stream()
            .build();
}

然而,这仅在您还使用non-stream 组件(如SimpleMessageListenerContainer或DirectMessageListeneerContainer)时才有效,因为在打开AMQP连接时会触发管理员来声明定义的bean。如果您的应用程序仅使用流组件,或者您希望使用高级流配置功能,则应改为配置StreamAdmin:

@Bean
StreamAdmin streamAdmin(Environment env) {
    return new StreamAdmin(env, sc -> {
        sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
        sc.stream("stream.queue2").create();
    });
}

一、Sending Messages

RabbitStreamTemplate提供RabbitTemplate(AMQP)功能的子集。

public interface RabbitStreamOperations extends AutoCloseable {
	CompletableFuture<Boolean> send(Message message);
	CompletableFuture<Boolean> convertAndSend(Object message);
	CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
	CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);
	MessageBuilder messageBuilder();
	MessageConverter messageConverter();
	StreamMessageConverter streamMessageConverter();
	@Override
	void close() throws AmqpException;
}

RabbitStreamTemplate实现具有以下构造函数和属性:

public RabbitStreamTemplate(Environment environment, String streamName) {
}
public void setMessageConverter(MessageConverter messageConverter) {
}
public void setStreamConverter(StreamMessageConverter streamConverter) {
}
public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}

MessageConverter在convertAndSend方法中用于将对象转换为Spring AMQP消息。

StreamMessageConverter用于将Spring AMQP消息转换为本机流消息。

您也可以直接发送本机流消息;使用messageBuilder()方法提供对生产者的消息生成器的访问。

ProducerCustomizer提供了一种机制,用于在生成生产者之前对其进行自定义。

 二、Receiving Messages

异步消息接收由StreamListenerContainer(以及使用@RabbitListener时的StreamRabbitListerContainerFactory)提供。

侦听器容器需要一个Environment以及一个流名称。

您可以使用经典的MessageListener接收Spring AMQP消息,也可以使用新接口接收本地流消息:

public interface StreamMessageListener extends MessageListener {
	void onStreamMessage(Message message, Context context);
}

有关支持的属性的信息,请参阅消息侦听器容器配置。

与模板类似,容器具有ConsumerCustomizer属性。

有关自定义环境和使用者的信息,请参阅Java客户端文档。

使用@RabbitListener时,配置StreamRabbitListerContainerFactory;此时,大多数@RabbitListener属性(并发等)将被忽略。仅支持id、队列、autoStartup和containerFactory。此外,队列只能包含一个流名称。

三、Examples

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
    template.setProducerCustomizer((name, builder) -> builder.name("test"));
    return template;
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
    return new StreamRabbitListenerContainerFactory(env);
}
@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
    ...
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
    StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
    factory.setNativeListener(true);
    factory.setConsumerCustomizer((id, builder) -> {
        builder.name("myConsumer")
                .offset(OffsetSpecification.first())
                .manualTrackingStrategy();
    });
    return factory;
}
@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
    ...
    context.storeOffset();
}
@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue1")
            .stream()
            .build();
}
@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue2")
            .stream()
            .build();
}

2.4.5版将adviceChain属性添加到StreamListenerContainer(及其工厂)。还提供了一个新的工厂bean来创建一个无状态重试拦截器,该拦截器带有一个可选的StreamMessageRecoverer,用于在使用原始流消息时使用。

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
    StreamRetryOperationsInterceptorFactoryBean rfb =
            new StreamRetryOperationsInterceptorFactoryBean();
    rfb.setRetryOperations(retryTemplate);
    rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
        ...
    });
    return rfb;
}

四、Super Streams

超级流是分区流的抽象概念,通过将多个流队列绑定到具有参数x-Super-Stream:true的交换来实现。

1、调配

为了方便起见,可以通过定义类型为SuperStream的单个bean来提供超级流。

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3);
}

RabbitAdmin检测到这个bean,并将声明交换(my.super.stream)和3个队列(分区)-my.super-stream-n,其中n是0,1,2,绑定的路由密钥等于n。

如果您还希望通过AMQP向exchange 发布,您可以提供自定义路由密钥:

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
					.mapToObj(j -> "rk-" + j)
					.collect(Collectors.toList()));
}

key 的数量必须等于分区的数量。

2、向超级流生产消息

你必须向 RabbitStreamTemplate 添加一个 superStreamRoutingFunction

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
    template.setSuperStreamRouting(message -> {
        // some logic to return a String for the client's hashing algorithm
    });
    return template;
}

你也可以通过AMQP发布,使用 RabbitTemplate

到此这篇关于RabbitMQ Stream插件使用详解的文章就介绍到这了,更多相关RabbitMQ Stream插件内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java 使用ImageIO.writer从BufferedImage生成jpeg图像遇到问题总结及解决

    java 使用ImageIO.writer从BufferedImage生成jpeg图像遇到问题总结及解决

    这篇文章主要介绍了java 使用ImageIO.writer从BufferedImage生成jpeg图像遇到问题总结及解决的相关资料,需要的朋友可以参考下
    2017-03-03
  • 几种JAVA细粒度锁的实现方式

    几种JAVA细粒度锁的实现方式

    这篇文章主要为大家详细介绍了几种JAVA细粒度锁的实现方式,感兴趣的小伙伴们可以参考一下
    2016-05-05
  • Spring自动装配@Autowired教程

    Spring自动装配@Autowired教程

    @Autowired注解是Spring中非常重要且常见的,接下来就简要的介绍一下它的用法。@Autowired默认是通过set方法,按照类型自动装配JavaBean,set方法可省略不写,它主要是修饰在成员变量上
    2023-01-01
  • Java中的自定义异常捕获方式

    Java中的自定义异常捕获方式

    这篇文章主要介绍了Java中的自定义异常捕获方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-08-08
  • Java File类常用方法与文件过滤器详解

    Java File类常用方法与文件过滤器详解

    Java File类以抽象的方式代表文件名和目录路径名。该类主要用于文件和目录的创建、文件的查找和文件的删除等。File对象代表磁盘中实际存在的文件和目录。本篇文章我们来讲解File类的常用方法与文件过滤器
    2022-04-04
  • Java并发编程之死锁相关知识整理

    Java并发编程之死锁相关知识整理

    前篇文章在讲解线程安全的时候,有提到过为了保证每个线程都能正常执行共享资源操作,Java引入了锁机制,虽然这样使多线程改善了系统的处理能力,然而也带来了新的问题,其中之一:死锁,需要的朋友可以参考下
    2021-06-06
  • springboot 实现长链接转短链接的示例代码

    springboot 实现长链接转短链接的示例代码

    短链接服务通过将长URL转换成6位短码,并存储长短链接对应关系到数据库中,用户访问短链接时,系统通过查询数据库并重定向到原始URL,实现快速访问,本文就来介绍一下如何使用,感兴趣的可以了解一下
    2024-09-09
  • java 地心坐标系(ECEF)和WGS-84坐标系(WGS84)互转的实现

    java 地心坐标系(ECEF)和WGS-84坐标系(WGS84)互转的实现

    这篇文章主要介绍了java 地心坐标系(ECEF)和WGS-84坐标系(WGS84)互转的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-09-09
  • java调用短信猫发短信示例

    java调用短信猫发短信示例

    这篇文章主要介绍了java调用短信猫发短信示例,需要的朋友可以参考下
    2014-04-04
  • java Jersey框架初体验

    java Jersey框架初体验

    本篇主要是Jersey体验,你将在不做任何编码的情况下,体验Jersey框架的神气魅力!本文还假定你在eclipse里安装了Maven插件
    2016-07-07

最新评论