Java中RabbitMQ队列实现RPC详解

 更新时间:2023年08月29日 08:29:27   作者:轻尘×  
这篇文章主要介绍了Java中RabbitMQ队列实现RPC详解,在本教程中,我们将使用RabbitMQ构建一个RPC系统:一个客户端和一个RPC服务器,我们将创建一个返回斐波那契数字的模拟RPC服务,,需要的朋友可以参考下

RabbitMQ实现RPC

如果我们需要在远程计算机上运行一个函数并等待结果,这种模式通常被称为远程过程调用或RPC。

在本教程中,我们将使用RabbitMQ构建一个RPC系统:

  1. 一个客户端和一个RPC服务器。
  2. 我们将创建一个返回斐波那契数字的模拟RPC服务。

整个过程示意图如下:

这里写图片描述

客户端将请求发送至rpc_queue(我们定义的消息队列),然后等待响应;服务端获取请求,并处理请求,然后将请求结果返回给队列,客户端得知请求被响应后获取结果。

在结果被响应之前,客户端是被阻塞的,主线程会等待RPC响应

如果每个RPC请求都创建一个回调队列。这是非常低效,我们创建一个单一的客户端回调队列。

这引发了一个新的问题,在该队列中收到回复时,不清楚回复属于哪个请求。这就需要用到 correlationId属性。

我们为没有请求设置唯一的correlationId值。

然后,当我们在回调队列中收到一条消息时,我们将获取这个值,将响应与请求的进行correlationId匹配。

如果我们一致就是我们需要的结果,否则就不是。

客户端代RPCClient

代码如下:

package com.adtec.rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient {
    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc_queue";
    private String replyQueueName;
    public RPCClient() throws IOException, TimeoutException {
        //建立一个连接和一个通道,并为回调声明一个唯一的'回调'队列
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        channel = connection.createChannel();
        //定义一个临时变量的接受队列名    
        replyQueueName = channel.queueDeclare().getQueue();
    }
    //发送RPC请求  
    public String call(String message) throws IOException, InterruptedException {
         //生成一个唯一的字符串作为回调队列的编号
        String corrId = UUID.randomUUID().toString();
        //发送请求消息,消息使用了两个属性:replyto和correlationId
        //服务端根据replyto返回结果,客户端根据correlationId判断响应是不是给自己的
        AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName)
                .build();
        //发布一个消息,requestQueueName路由规则
        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
        //由于我们的消费者交易处理是在单独的线程中进行的,因此我们需要在响应到达之前暂停主线程。
        //这里我们创建的 容量为1的阻塞队列ArrayBlockingQueue,因为我们只需要等待一个响应。
        final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
        // String basicConsume(String queue, boolean autoAck, Consumer callback)
        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                //检查它的correlationId是否是我们所要找的那个
                if (properties.getCorrelationId().equals(corrId)) {
                    //如果是,则响应BlockingQueue
                    response.offer(new String(body, "UTF-8"));
                }
            }
        });
        return response.take();
    }
    public void close() throws IOException {
        connection.close();
    }
    public static void main(String[] argv) {
        RPCClient fibonacciRpc = null;
        String response = null;
        try {
            fibonacciRpc = new RPCClient();
            System.out.println(" [x] Requesting fib(30)");
            response = fibonacciRpc.call("30");
            System.out.println(" [.] Got '" + response + "'");
        } catch (IOException | TimeoutException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (fibonacciRpc != null) {
                try {
                    fibonacciRpc.close();
                } catch (IOException _ignore) {
                }
            }
        }
    }
}

上面的代码中用到了阻塞队列ArrayBlockingQueue

服务端代RPCServer

代码如下:

package rabbitmq;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RPCServer {
    private static final String RPC_QUEUE_NAME = "rpc_queue";
    //具体处理方法
    private static int fib(int n) {
        if (n == 0)
            return 0;
        if (n == 1)
            return 1;
        return fib(n - 1) + fib(n - 2);
    }
    public static void main(String[] argv) {
         //建立连接、通道,并声明队列 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = null;
        try {
            connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
            channel.basicQos(1);
            System.out.println(" [x] Awaiting RPC requests");
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                            .correlationId(properties.getCorrelationId()).build();
                    String response = "";
                    try {
                        String message = new String(body, "UTF-8");
                        int n = Integer.parseInt(message);
                        System.out.println(" [.] fib(" + message + ")");
                        response += fib(n);
                    } catch (RuntimeException e) {
                        System.out.println(" [.] " + e.toString());
                    } finally {
                        // 返回处理结果队列
                        channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                        //  确认消息,已经收到后面参数 multiple:是否批量.true:将一次性确认所有小于envelope.getDeliveryTag()的消息。
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        // RabbitMq consumer worker thread notifies the RPC
                        // server owner thread
                        synchronized (this) {
                            this.notify();
                        }
                    }
                }
            };
            //取消自动确认
            boolean autoAck = false ;
            channel.basicConsume(RPC_QUEUE_NAME, autoAck, consumer);
            // Wait and be prepared to consume the message from RPC client.
            while (true) {
                synchronized (consumer) {
                    try {
                        consumer.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        } catch (IOException | TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (connection != null)
                try {
                    connection.close();
                } catch (IOException _ignore) {
                }
        }
    }
}

测试时先运行服务端,再运行客户端 为了方便观察结果,最好将客户端和服务端在不同workspace实现

客户端结果

这里写图片描述

服务端结果

这里写图片描述

到此这篇关于Java中RabbitMQ队列实现RPC详解的文章就介绍到这了,更多相关RabbitMQ实现RPC内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Springboot项目全局异常统一处理案例代码

    Springboot项目全局异常统一处理案例代码

    最近在做项目时需要对异常进行全局统一处理,主要是一些分类入库以及记录日志等,因为项目是基于Springboot的,所以去网络上找了一些博客文档,然后再结合项目本身的一些特殊需求做了些许改造,现在记录下来便于以后查看
    2023-01-01
  • Java使用selenium爬取b站动态的实现方式

    Java使用selenium爬取b站动态的实现方式

    本文主要介绍了Java使用selenium爬取b站动态的实现方式,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2022-01-01
  • java设计模式之代理模式(Porxy)详解

    java设计模式之代理模式(Porxy)详解

    这篇文章主要为大家详细介绍了java设计模式之代理模式Porxy的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-06-06
  • 从Springboot项目中下载文件的具体过程

    从Springboot项目中下载文件的具体过程

    最近在做一个临时的项目,APP端在检测到程序有更新时,需要去后台下载新的安装包,接下来通过本文给大家分享从Springboot项目中下载文件的具体过程,感兴趣的朋友一起看看吧
    2021-07-07
  • 聊聊Spring Boot如何配置多个Kafka数据源

    聊聊Spring Boot如何配置多个Kafka数据源

    这篇文章主要介绍了Spring Boot配置多个Kafka数据源的相关知识,包括生产者、消费者配置,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2023-10-10
  • Spring @Profile注解详解

    Spring @Profile注解详解

    这篇文章主要介绍了Spring @Profile注解详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-08-08
  • Visual Studio Code配置Tomcat运行Java Web项目详细步骤

    Visual Studio Code配置Tomcat运行Java Web项目详细步骤

    VS Code是一款非常棒的文本编辑器,具有配置简单、功能丰富、轻量简洁的特点,并且极其适合处理中小规模的代码,这篇文章主要给大家介绍了关于Visual Studio Code配置Tomcat运行Java Web项目的详细步骤,需要的朋友可以参考下
    2023-11-11
  • Java跳跃游戏实例真题解决思路详解

    Java跳跃游戏实例真题解决思路详解

    这篇文章主要介绍了Java跳跃游戏,总的来说这并不是一道难题,那为什么要拿出这道题介绍?拿出这道题真正想要传达的是解题的思路,以及不断优化探寻最优解的过程。希望通过这道题能给你带来一种解题优化的思路
    2022-10-10
  • java中maven下载和安装步骤说明

    java中maven下载和安装步骤说明

    在本篇文章里小编给大家分享的是一篇关于java中maven下载和安装步骤说明内容,对此有兴趣的朋友们可以学习参考下。
    2021-02-02
  • java实现简单TCP聊天程序

    java实现简单TCP聊天程序

    这篇文章主要为大家详细介绍了java实现简单TCP聊天程,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-07-07

最新评论