SpringBoot整合RabbitMQ实现RPC远程调用功能

 更新时间:2023年06月12日 11:40:45   作者:霁晨晨晨  
在分布式系统中,RPC(Remote Procedure Call)是一种常用的通信机制,它可以让不同的节点之间像调用本地函数一样进行函数调用,隐藏了底层的网络通信细节,通过本教程,你可以了解RPC的基本原理以及如何使用Java实现一个简单的RPC客户端和服务端

1. 交互过程

  • 启动 RPC 服务端和客户端,创建连接和通道。
  • 声明请求队列和回复队列,确保使用相同的队列名称。
  • 客户端发送请求:客户端将请求消息发送到指定的请求队列中。
  • 服务端监听请求队列:服务端在指定的请求队列上监听请求消息。
  • 服务端接收请求:服务端接收到客户端发送的请求消息。
  • 服务端处理请求:服务端根据请求消息中的参数,执行相应的业务逻辑,并得到处理结果。
  • 服务端发送响应:服务端将处理结果作为响应消息发送到客户端指定的回复队列中。
  • 客户端监听响应队列:客户端在指定的回复队列上监听响应消息。
  • 客户端接收响应:客户端接收到服务端发送的响应消息。
  • 客户端处理响应:客户端根据响应消息中的结果进行相应的处理。

2. 导入依赖

创建一个SpringBoot项目并导入依赖坐标

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.10.RELEASE</version>
    <relativePath/>
</parent>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <version>2.3.10.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.3.10.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.12.0</version>
    </dependency>
</dependencies>

application.yml

spring:
  rabbitmq:
    host: localhost # 主机名
    port: 5672 # 端口
    username: guest # 用户名
    password: guest # 密码
    virtual-host: /
    template:
      receive-timeout: 2000
      reply-timeout: 2000
    listener:
      simple:
        concurrency: 1
        max-concurrency: 3
        prefetch: 1 # 消费者每次只能预取1条数据到内存并处理,默认为250条
        acknowledge-mode: manual # 确定机制 manual:手动确认
    publisher-returns: true
    publisher-confirm-type: correlated

注意:需要提前开启RabbitMQ服务,否则项目运行会报错

3. RPC 服务端

首先,我们来看一下RPC服务端的代码。

package com.rabbit.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 * rpc服务端
 */
public class RPCServer {
    // 定义请求队列常量
    private final static String REQUEST_QUEUE_NAME = "rpc_queue";
    // 定义回复队列常量
    private final static String REPLY_QUEUE_NAME = "rpc_reply_queue";
    /**
     * 服务端启动入口
     */
    public static void main(String[] args) {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        //factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明请求队列和回复队列
            channel.queueDeclare(REQUEST_QUEUE_NAME, false, false, false, null);
            channel.queueDeclare(REPLY_QUEUE_NAME, false, false, false, null);
            // 每次仅接收一条未经确认的消息
            channel.basicQos(1);
            // 构建消费者属性
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 获取消息
                    String message = new String(body, StandardCharsets.UTF_8);
                    // 进行业务处理构建响应数据 (这里做字符串拼接模拟响应数据)
                    String response = message + ":::";
                    // 构造响应基本属性
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                            .correlationId(properties.getCorrelationId())   // 设置关联id
                            .build();
                    // 发送响应数据到回复队列 (rpc_reply_queue)
                    channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));
                    // 手动回执消息确认消费
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            // 消费rpc_queue队列的消息
            channel.basicConsume(REQUEST_QUEUE_NAME, false, consumer);
            // 持续监听请求消息
            while (true) {
                Thread.sleep(50);
            }
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在RPC服务端的代码中,我们首先创建了一个连接和一个通道,然后声明了请求队列和回复队列。我们设置每一次只接收一条未确认的消息,并创建了一个消费者对象,用于处理接收到的消息。

在handleDelivery方法中,我们从消息中获取请求数据,并进行业务处理,然后构造响应数据并发送到回复队列。最后,我们手动确认消费,并继续监听请求消息。

4. RPC 客户端

接下来,我们看一下RPC客户端的代码。

package com.rabbit.rpc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
/**
 * rpc客户端
 */
public class RPCClient {
    // 定义请求队列常量
    private final static String REQUEST_QUEUE_NAME = "rpc_queue";
    // 定义回复队列常量
    private final static String REPLY_QUEUE_NAME = "rpc_reply_queue";
    /**
     * 客户端启动入口
     */
    public static void main(String[] args) {
        System.out.println("Response1: " + call("Hello, RPC Server1"));
        System.out.println("Response2: " + call("Hello, RPC Server2"));
        System.out.println("Response3: " + call("Hello, RPC Server3"));
    }
    /**
     * 发送请求到队列并返回响应数据
     *
     * @param message 请求消息
     * @return 响应数据
     */
    public static String call(String message) {
        // 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        //factory.setHost("localhost");
        String result = "";
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 声明请求队列和回复队列
            channel.queueDeclare(REQUEST_QUEUE_NAME, false, false, false, null);
            channel.queueDeclare(REPLY_QUEUE_NAME, false, false, false, null);
            // 每次仅接收一条未经确认的消息
            channel.basicQos(1);
            // 生成关联id
            String correlationId = UUID.randomUUID().toString();
            // 构造请求基本属性
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
                    .correlationId(correlationId)   // 设置关联id
                    .replyTo(REPLY_QUEUE_NAME)      // 设置回复队列 (rpc_reply_queue)
                    .build();
            // 发送请求消息到rpc_queue队列
            channel.basicPublish("", REQUEST_QUEUE_NAME, props, message.getBytes(StandardCharsets.UTF_8));
            // 用于保存响应消息的阻塞队列
            final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
            // 监听回复队列接收响应消息
            String consumerTag = channel.basicConsume(REPLY_QUEUE_NAME, true, new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                    // 通过关联id获取到响应数据
                    if (properties.getCorrelationId().equals(correlationId)) {
                        response.offer(new String(body, StandardCharsets.UTF_8));
                    }
                }
            });
            // 清空阻塞队列
            response.clear();
            // 等待接收响应消息
            result = response.take();
            // 取消消费者的监听
            channel.basicCancel(consumerTag);
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        }
        return result;
    }
}

在RPC客户端的代码中,我们同样创建了一个连接和一个通信,并声明了请求队列和回复队列。然后,我们设置每次只接收一条未确认的消息,并创建了一个唯一的关联id。

在call方法中,我们构建了请求消息的基础属性,并将请求消息发送到请求队列。接下来,我们创建了一个阻塞队列,用于保存response响应消息。

通过监听回复队列接收响应消息,并通过关联id接收到对应的响应数据,然后将其放入response队列中。最后,通过阻塞队列接收响应消息,并返回响应数据。

5. 运行代码

现在,我们可以运行RPC服务端和客户端的代码了。首先运行服务端代码,它会启动一个监听请求的进程。然后,运行客户端代码,它会发送请求消息并等待接收响应消息。

最终会看到客户端发出了三次请求,并打印了对应的响应数据。

项目地址:rabbitmq-rpc: RabbitMQ实现RPC远程调用功能练习Demo (gitee.com)

以上就是SpringBoot整合RabbitMQ实现RPC远程调用功能的详细内容,更多关于SpringBoot RabbitMQ RPC远程调用的资料请关注脚本之家其它相关文章!

相关文章

  • java自动生成ID号的方法

    java自动生成ID号的方法

    这篇文章主要介绍了java自动生成ID号的方法,涉及java生成ID号的技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-03-03
  • SpringBoot整合OpenCV的实现示例

    SpringBoot整合OpenCV的实现示例

    这篇文章主要介绍了SpringBoot整合OpenCV的实现示例。文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-12-12
  • 利用Spring Data MongoDB持久化文档数据的方法教程

    利用Spring Data MongoDB持久化文档数据的方法教程

    这篇文章主要给大家介绍了关于利用Spring Data MongoDB持久化文档数据的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考借鉴,下面来一起看看吧。
    2017-08-08
  • mybatis in查询条件过长的解决方案

    mybatis in查询条件过长的解决方案

    这篇文章主要介绍了mybatis in查询条件过长的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-10-10
  • java 对象数组排序

    java 对象数组排序

    当遇到数组排序时,我们经常会使用学过的几种排序方法,而java 本身提供了Arrays.sort,在数据元素较少或者对效率要求不是抬高时,直接使用Arrays.sort来的更容易。查看一下源码后Arrays.sort 本身采用的是快速排序。
    2015-04-04
  • Spring发送邮件如何内嵌图片增加附件

    Spring发送邮件如何内嵌图片增加附件

    这篇文章主要介绍了Spring发送邮件如何内嵌图片增加附件,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-10-10
  • 详解Springboot 优雅停止服务的几种方法

    详解Springboot 优雅停止服务的几种方法

    这篇文章主要介绍了详解Springboot 优雅停止服务的几种方法 ,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-08-08
  • Spring使用@Conditional进行条件装配的实现

    Spring使用@Conditional进行条件装配的实现

    在spring中有些bean需要满足某些环境条件才创建某个bean,这个时候可以在bean定义上使用@Conditional注解来修饰,所以本文给大家介绍了Spring使用@Conditional进行条件装配的实现,文中通过代码示例给大家介绍的非常详细,需要的朋友可以参考下
    2023-12-12
  • java9新特性Collection集合类的增强与优化方法示例

    java9新特性Collection集合类的增强与优化方法示例

    这篇文章主要为大家介绍了java9新特性Collection集合类的增强与优化方法示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步
    2022-03-03
  • SpringMVC实现文件上传下载功能

    SpringMVC实现文件上传下载功能

    这篇文章主要为大家详细介绍了springMVC实现文件上传和下载的方法,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-05-05

最新评论