详解SpringBoot中使用RabbitMQ的RPC功能

 更新时间:2021年11月15日 16:20:07   作者:黑莹de希望  
这篇文章主要介绍了详解SpringBoot中使用RabbitMQ的RPC功能,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

一、RabbitMQ的RPC简介

实际业务中,有的时候我们还需要等待消费者返回结果给我们,或者是说我们需要消费者上的一个功能、一个方法或是一个接口返回给我们相应的值,而往往大型的系统软件,生产者跟消费者之间都是相互独立的两个系统,部署在两个不同的电脑上,不能通过直接对象.方法的形式获取想要的结果,这时候我们就需要用到RPC(Remote Procedure Call)远程过程调用方式。
RabbitMQ实现RPC的方式很简单,生产者发送一条带有标签(消息ID(correlation_id)+回调队列名称)的消息到发送队列,消费者(也称RPC服务端)从发送队列获取消息并处理业务,解析标签的信息将业务结果发送到指定的回调队列,生产者从回调队列中根据标签的信息获取发送消息的返回结果。

在这里插入图片描述

如图,客户端C发送消息,指定消息的ID=rpc_id,回调响应的队列名称为rpc_resp,消息从C发送到rpc_request队列,服务端S获取消息业务处理之后,将correlation_id附加到响应的结果发送到指定的回调队列rpc_resp中,客户端从回调队列获取消息,匹配与发送消息的correlation_id相同的值为消息应答结果。

二、SpringBoot中使用RabbitMQ的RPC功能

注意:springboot中使用的时候,correlation_id为系统自动生成的,reply_to在加载AmqpTemplate实例的时候设置的。

实例:
说明:队列1为发送队列,队列2为返回队列

1.先配置rabbitmq

package com.ws.common;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/*
 * rabbitMQ配置类
 */
@Configuration
public class RabbitMQConfig {
	public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";
    public static final String TOPIC_EXCHANGE = "topic.exchange";
    
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    
    @Autowired
    ConnectionFactory connectionFactory;
    
    @Bean(name = "connectionFactory")
    public ConnectionFactory connectionFactory() {
    	CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    	connectionFactory.setHost(host);
    	connectionFactory.setPort(port);
    	connectionFactory.setUsername(username);
    	connectionFactory.setPassword(password);
    	connectionFactory.setVirtualHost("/");
    	return connectionFactory;
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate() {
    	RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    	//设置reply_to(返回队列,只能在这设置)
    	rabbitTemplate.setReplyAddress(TOPIC_QUEUE2);
    	rabbitTemplate.setReplyTimeout(60000);
    	return rabbitTemplate;
    }
    //返回队列监听器(必须有)
    @Bean(name="replyMessageListenerContainer")
    public SimpleMessageListenerContainer createReplyListenerContainer() {
         SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
         listenerContainer.setConnectionFactory(connectionFactory);
         listenerContainer.setQueueNames(TOPIC_QUEUE2);
         listenerContainer.setMessageListener(rabbitTemplate());
         return listenerContainer;
    }
    

    
    //创建队列
    @Bean
    public Queue topicQueue1() {
        return new Queue(TOPIC_QUEUE1);
    }
    @Bean
    public Queue topicQueue2() {
        return new Queue(TOPIC_QUEUE2);
    }
    
    //创建交换机
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }
    
    //交换机与队列进行绑定
    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_QUEUE1);
    }
    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_QUEUE2);
    }
}

2.发送消息并同步等待返回值

@Autowired
private RabbitTemplate rabbitTemplate;


//报文body
String sss = "报文的内容";
//封装Message
Message msg = this.con(sss);
log.info("客户端--------------------"+msg.toString());
//使用sendAndReceive方法完成rpc调用
Message message=rabbitTemplate.sendAndReceive(RabbitMQConfig.TOPIC_EXCHANGE, RabbitMQConfig.TOPIC_QUEUE1, msg);
//提取rpc回应内容body
String response = new String(message.getBody());
log.info("回应:" + response);
log.info("rpc完成---------------------------------------------");


public Message con(String s) {
	MessageProperties mp = new MessageProperties();
	byte[] src = s.getBytes(Charset.forName("UTF-8"));
	//mp.setReplyTo("adsdas");   加载AmqpTemplate时设置,这里设置没用
	//mp.setCorrelationId("2222");   系统生成,这里设置没用
	mp.setContentType("application/json");
	mp.setContentEncoding("UTF-8");
	mp.setContentLength((long)s.length());
	return new Message(src, mp);
} 

3.写消费者

package com.ws.listener.mq;

import java.nio.charset.Charset;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.ws.common.RabbitMQConfig;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class Receiver {
	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	@RabbitListener(queues=RabbitMQConfig.TOPIC_QUEUE1)
	public void receiveTopic1(Message msg) {
		log.info("队列1:"+msg.toString());
		String msgBody = new String(msg.getBody());
		//数据处理,返回的Message
		Message repMsg = con(msgBody+"返回了", msg.getMessageProperties().getCorrelationId());
		
		rabbitTemplate.send(RabbitMQConfig.TOPIC_EXCHANGE, RabbitMQConfig.TOPIC_QUEUE2, repMsg);
		
    }
	@RabbitListener(queues=RabbitMQConfig.TOPIC_QUEUE2)
	public void receiveTopic2(Message msg) {
		log.info("队列2:"+msg.toString());
		
    }
	
	public Message con(String s, String id) {
		MessageProperties mp = new MessageProperties();
		byte[] src = s.getBytes(Charset.forName("UTF-8"));
		mp.setContentType("application/json");
		mp.setContentEncoding("UTF-8");
		mp.setCorrelationId(id);
		
		return new Message(src, mp);
	} 
}

日志打印:

2019-06-26 17:11:16.607 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - 客户端--------------------(Body:‘报文的内容' MessageProperties [headers={}, contentType=application/json, contentEncoding=UTF-8, contentLength=5, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])

2019-06-26 17:11:16.618 [SimpleAsyncTaskExecutor-1] INFO com.ws.listener.mq.Receiver - 队列1:(Body:‘报文的内容' MessageProperties [headers={}, correlationId=1, replyTo=topic.queue2, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=topic.exchange, receivedRoutingKey=topic.queue1, deliveryTag=1, consumerTag=amq.ctag-8IzlhblYmTebqUYd-uferw, consumerQueue=topic.queue1])

2019-06-26 17:11:16.623 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - 回应:报文的内容返回了

2019-06-26 17:11:16.623 [http-nio-8080-exec-4] INFO com.ws.controller.UserController - rpc完成---------------------------------------------

到此这篇关于SpringBoot中使用RabbitMQ的RPC功能的文章就介绍到这了,更多相关SpringBoot使用RabbitMQ内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java自动化测试中多数据源的切换(实例讲解)

    Java自动化测试中多数据源的切换(实例讲解)

    下面小编就为大家带来一篇Java自动化测试中多数据源的切换(实例讲解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-10-10
  • 如何将mybatis配置到springmvc中

    如何将mybatis配置到springmvc中

    为了更方便的连接数据库,将mybatis配置到springMVC中。接下来通过本文给大家分享如何将mybatis配置到springmvc中,需要的朋友参考下吧
    2017-11-11
  • mybatis实现获取入参是List和Map的取值

    mybatis实现获取入参是List和Map的取值

    这篇文章主要介绍了mybatis实现获取入参是List和Map的取值问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-06-06
  • Java编写迷宫小游戏

    Java编写迷宫小游戏

    最近经常在机房看同学在玩一个走迷宫的游戏,比较有趣,自己也用java写一个实现随机生成迷宫的算法,其实就是一个图的深度优先遍历算法.
    2016-05-05
  • 关于mybatis3中@SelectProvider的使用问题

    关于mybatis3中@SelectProvider的使用问题

    这篇文章主要介绍了mybatis3中@SelectProvider的使用技巧,@SelectProvide指定一个Class及其方法,并且通过调用Class上的这个方法来获得sql语句,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下
    2021-12-12
  • idea启动Tomcat时控制台乱码的解决方法(亲测有效)

    idea启动Tomcat时控制台乱码的解决方法(亲测有效)

    最近在idea中启动tomcat出现控制台乱码问题,尝试了很多方法,最后终于解决了,所以下面这篇文章主要给大家介绍了关于idea启动Tomcat时控制台乱码的解决方法,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-07-07
  • java多态性中的Overload和Override区别详解

    java多态性中的Overload和Override区别详解

    这篇文章主要介绍了java多态性中的Overload和Override区别详解,重写(Overriding)是父类与子类之间多态性的一种表现,而重载(Overloading)是一个类中多态性的一种表现,需要的朋友可以参考下
    2023-07-07
  • SpringBoot整合Mybatis自定义拦截器不起作用的处理方案

    SpringBoot整合Mybatis自定义拦截器不起作用的处理方案

    这篇文章主要介绍了SpringBoot整合Mybatis自定义拦截器不起作用的处理方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • 四种引用类型在JAVA Springboot中的使用详解

    四种引用类型在JAVA Springboot中的使用详解

    这篇文章主要介绍了springboot的四种引用类型,本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-09-09
  • Java之NIO基本简介

    Java之NIO基本简介

    这篇文章主要介绍了Java之NIO基本简介,文中给大家讲到了NIO 与 BIO的比较结合实例代码给大家介绍的非常详细,需要的朋友可以参考下
    2023-05-05

最新评论