springboot rabbitmq整合rabbitmq之消息持久化存储问题

 更新时间:2023年09月28日 08:50:08   作者:weixin_43831204  
这篇文章主要介绍了springboot rabbitmq整合rabbitmq之消息持久化存储问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

rabbitmq消息持久化存储包含三个方面

  • 1、exchange的持久化
  • 2、queue的持久化
  • 3、message的持久化

exchange的持久化

在申明exchange的时候,有个参数:durable。

当该参数为true,则对该exchange做持久化,重启rabbitmq服务器,该exchange不会消失。

durable的默认值为true

public class DirectExchange extends AbstractExchange {
    public static final DirectExchange DEFAULT = new DirectExchange("");
    public DirectExchange(String name) {
        super(name);
    }
    public DirectExchange(String name, boolean durable, boolean autoDelete) {
        super(name, durable, autoDelete);
    }
    public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
        super(name, durable, autoDelete, arguments);
    }
    public final String getType() {
        return "direct";
    }
}
public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
    private final String name;
    private final boolean durable;
    private final boolean autoDelete;
    private final Map<String, Object> arguments;
    private volatile boolean delayed;
    private boolean internal;
    public AbstractExchange(String name) {
        this(name, true, false);
    }
    public AbstractExchange(String name, boolean durable, boolean autoDelete) {
        this(name, durable, autoDelete, (Map)null);
    }
    public AbstractExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
        this.name = name;
        this.durable = durable;
        this.autoDelete = autoDelete;
        if (arguments != null) {
            this.arguments = arguments;
        } else {
            this.arguments = new HashMap();
        }
    }

queue的持久化

申明队列时也有个参数:durable。

当该参数为true,则对该queue做持久化,重启rabbitmq服务器,该queue不会消失。

durable的默认值为true

public Queue(String name) {
        this(name, true, false, false);
    }
    public Queue(String name, boolean durable) {
        this(name, durable, false, false, (Map)null);
    }
    public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
        this(name, durable, exclusive, autoDelete, (Map)null);
    }
    public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) {
        Assert.notNull(name, "'name' cannot be null");
        this.name = name;
        this.actualName = StringUtils.hasText(name) ? name : Base64UrlNamingStrategy.DEFAULT.generateName() + "_awaiting_declaration";
        this.durable = durable;
        this.exclusive = exclusive;
        this.autoDelete = autoDelete;
        this.arguments = (Map)(arguments != null ? arguments : new HashMap());
    }

message的持久化

前面我们已经讲到exchange与queue的持久化,那么message如何持久化呢?

我们在使用rabbit-client做消息持久化时,设置了BasicProperties的deliveryMode为2,做消息的持久化。

AMQP.BasicProperties properties = new AMQP.BasicProperties.
                Builder().
                deliveryMode(2).
                build();
        channel.basicPublish("ex.pc", "key.pc",  properties, "hello world".getBytes());

那么整合了spring boot,使用RabbitTemplate如何做持久化?

首先,我们来到经常的使用的消息发送方法:RabbitTemplate类下的convertAndSend

@Override
    public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
        convertAndSend(exchange, routingKey, object, (CorrelationData) null);
    }

然后调用了该类下的重载方法:convertAndSend。该方法中将object 转换成了message

@Override
    public void convertAndSend(String exchange, String routingKey, final Object object,
            @Nullable CorrelationData correlationData) throws AmqpException {
        send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
    }

在做消息转换的时候,我们注意到,传入了一个MessageProperties对象

protected Message convertMessageIfNecessary(final Object object) {
        if (object instanceof Message) {
            return (Message) object;
        }
        return getRequiredMessageConverter().toMessage(object, new MessageProperties());
    }

在MessageProperties中,有个deliveryMode属性,该属性默认值为:MessageDeliveryMode.PERSISTENT(持久化的)

 public MessageProperties() {
        this.deliveryMode = DEFAULT_DELIVERY_MODE;
        this.priority = DEFAULT_PRIORITY;
    }
static {
    DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
    DEFAULT_PRIORITY = 0;
}

消息转换完成后,调用时同类方法的send方法

@Override
    public void send(final String exchange, final String routingKey,
            final Message message, @Nullable final CorrelationData correlationData)
            throws AmqpException {
        execute(channel -> {
            doSend(channel, exchange, routingKey, message,
                    (RabbitTemplate.this.returnCallback != null
                            || (correlationData != null && StringUtils.hasText(correlationData.getId())))
                            && RabbitTemplate.this.mandatoryExpression.getValue(
                                    RabbitTemplate.this.evaluationContext, message, Boolean.class),
                    correlationData);
            return null;
        }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
    }

该方法又调用了doSend方法

public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message, // NOSONAR complexity
            boolean mandatory, @Nullable CorrelationData correlationData)
                    throws Exception { // NOSONAR TODO: change to IOException in 2.2.
        String exch = exchangeArg;
        String rKey = routingKeyArg;
        if (exch == null) {
            exch = this.exchange;
        }
        if (rKey == null) {
            rKey = this.routingKey;
        }
        if (logger.isDebugEnabled()) {
            logger.debug("Publishing message " + message
                    + "on exchange [" + exch + "], routingKey = [" + rKey + "]");
        }
        Message messageToUse = message;
        MessageProperties messageProperties = messageToUse.getMessageProperties();
        if (mandatory) {
            messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY, this.uuid);
        }
        if (this.beforePublishPostProcessors != null) {
            for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
                messageToUse = processor.postProcessMessage(messageToUse, correlationData);
            }
        }
        setupConfirm(channel, messageToUse, correlationData);
        if (this.userIdExpression != null && messageProperties.getUserId() == null) {
            String userId = this.userIdExpression.getValue(this.evaluationContext, messageToUse, String.class);
            if (userId != null) {
                messageProperties.setUserId(userId);
            }
        }
        sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
        // Check if commit needed
        if (isChannelLocallyTransacted(channel)) {
            // Transacted channel created by this template -> commit.
            RabbitUtils.commitIfNecessary(channel);
        }
    }

在该方法中我们终于看到了发送消息到rabbitmq的操作:sendToRabbit。

该方法将MessageProperties对象转换成了BasicProperties。

至此,我们终于了解了,spring rabbit 中如何实现messge的持久化。

默认的message就是持久化的

protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory,
            Message message) throws IOException {
        BasicProperties convertedMessageProperties = this.messagePropertiesConverter
                .fromMessageProperties(message.getMessageProperties(), this.encoding);
        channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
    }

如何改变message的持久化属性?

根据上面的源码分析,spring中默认的message就是持久化的,如何改变持久化属性?

1、使用send方法,发送message。设置message中MessageProperties的属性deliveryMode

2、自定义MessageConverter,在消息转换时,设置MessageProperties的属性deliveryMode

3、自定MessagePropertiesConverter,在MessageProperties对象转换成BasicProperties时,设置deliveryMode

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Java生成读取条形码和二维码的简单示例

    Java生成读取条形码和二维码的简单示例

    条形码(barcode)是将宽度不等的多个黑条和空白,按照一定的规则排列,用来表示一组信息的图形标识符,而二维码大家应该都很熟悉了,这篇文章主要给大家介绍了关于Java生成读取条形码和二维码的相关资料,需要的朋友可以参考下
    2021-07-07
  • java算法题解Leetcode15三数之和实例

    java算法题解Leetcode15三数之和实例

    这篇文章主要为大家介绍了java算法题解Leetcode15三数之和实例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-01-01
  • Springboot apollo原理及使用方法详解

    Springboot apollo原理及使用方法详解

    这篇文章主要介绍了Springboot apollo原理及使用方法详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-06-06
  • spring AOP代理执行@EnableAspectJAutoProxy的exposeProxy属性详解

    spring AOP代理执行@EnableAspectJAutoProxy的exposeProxy属性详解

    这篇文章主要为大家介绍了spring AOP代理执行@EnableAspectJAutoProxy的exposeProxy属性详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-09-09
  • Spring的注解简单介绍

    Spring的注解简单介绍

    这篇文章主要介绍了Spring的注解简单介绍,具有一定借鉴价值,需要的朋友可以参考下
    2017-12-12
  • Java之SpringBoot-Thymeleaf详情

    Java之SpringBoot-Thymeleaf详情

    聊Thymeleaf,需要知道为什么到了SpringBoot中就不用JSP了?这跟SpringBoot打包方式有点关系,SpringBoot项目打包是jar包,下面文章小编就对此做一个详细介绍,需要的朋友可以参考一下
    2021-09-09
  • Java NIO Buffer过程详解

    Java NIO Buffer过程详解

    这篇文章主要介绍了Java NIO Buffer过程详解,缓冲区在java nio中负责数据的存储。缓冲区就是数组。用于存储不同数据类型的数据。,需要的朋友可以参考下
    2019-06-06
  • 在Java中FreeMarker 模板来定义字符串模板

    在Java中FreeMarker 模板来定义字符串模板

    这篇文章主要介绍了在Java中FreeMarker 模板来定义字符串模板,文章基于Java的相关资料展开详细内容,需要的小伙伴可以参考一下
    2022-04-04
  • 解决SpringBoot项目读取yml文件中值为中文时,在视图页面显示乱码

    解决SpringBoot项目读取yml文件中值为中文时,在视图页面显示乱码

    这篇文章主要介绍了解决SpringBoot项目读取yml文件中值为中文时,在视图页面显示乱码的问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-08-08
  • Java concurrency之集合_动力节点Java学院整理

    Java concurrency之集合_动力节点Java学院整理

    Java集合主体内容包括Collection集合和Map类;而Collection集合又可以划分为List(队列)和Set(集合),有需要的小伙伴可以参考下
    2017-06-06

最新评论