在RabbitMQ中实现Work queues工作队列模式

 更新时间:2021年04月16日 15:00:37   作者:Java_Caiyo  
这篇文章主要介绍了如何在RabbitMQ中实现Work queues模式,代码详细,解释清晰,可以帮助大家更好理解java,对这方面感兴趣的朋友可以参考下

一、模式说明

Work Queues 与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

应用场景 :对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

二、代码

Work Queues 与入门程序的 简单模式 的代码是几乎一样的:可以完全复制,并复制多一个消费者进行多个消费者同时消费消息的测试。

①生产者

package com.itheima.rabbitmq.work; 
import com.itheima.rabbitmq.util.ConnectionUtil; 
import com.rabbitmq.client.Channel; 
import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.ConnectionFactory; 
public class Producer { 
	static final String QUEUE_NAME = "work_queue"; 
	public static void main(String[] args) throws Exception { 
		//创建连接 
		Connection connection = ConnectionUtil.getConnection(); 
		// 创建频道 
		Channel channel = connection.createChannel(); 
		// 声明(创建)队列 
		/**
		 * 参数1:队列名称 
		 * 参数2:是否定义持久化队列 
		 * 参数3:是否独占本次连接 
		 * 参数4:是否在不使用的时候自动删除队列 
		 * 参数5:队列其它参数 
		*/ 
		channel.queueDeclare(QUEUE_NAME, true, false, false, null); 
		for (int i = 1; i <= 30; i++) { 
			// 发送信息 
			String message = "你好;小兔子!work模式--" + i; 
			/**
			 * 参数1:交换机名称,如果没有指定则使用默认Default Exchage 
			 * 参数2:路由key,简单模式可以传递队列名称 
			 * 参数3:消息其它属性 
			 * 参数4:消息内容 
			*/ 
			channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 
			System.out.println("已发送消息:" + message); 
		}
		// 关闭资源 
		channel.close(); connection.close(); 
	} 
}

②消费者1

package com.itheima.rabbitmq.work; 
import com.itheima.rabbitmq.util.ConnectionUtil; 
import com.rabbitmq.client.*;
import java.io.IOException; 
public class Consumer1 { 
	public static void main(String[] args) throws Exception { 
		Connection connection = ConnectionUtil.getConnection(); 
		// 创建频道 
		Channel channel = connection.createChannel(); 
		// 声明(创建)队列 
		/**
		 * 参数1:队列名称 
		 * 参数2:是否定义持久化队列 
		 * 参数3:是否独占本次连接 
		 * 参数4:是否在不使用的时候自动删除队列 
		 * 参数5:队列其它参数 
		*/ 
		channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); 
		//一次只能接收并处理一个消息 
		channel.basicQos(1); 
		//创建消费者;并设置消息处理 
		DefaultConsumer consumer = new DefaultConsumer(channel){ 
			@Override 
			/**
			 * consumerTag 消息者标签,在channel.basicConsume时候可以指定 
			 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) 
			 * properties 属性信息 
			 * body 消息 
			*/ 
			public void handleDelivery(String consumerTag, Envelope envelope, 
					AMQP.BasicProperties properties, byte[] body) throws IOException { 
				try {
					//路由key 
					System.out.println("路由key为:" + envelope.getRoutingKey()); 
					//交换机 
					System.out.println("交换机为:" + envelope.getExchange()); 
					//消息id 
					System.out.println("消息id为:" + envelope.getDeliveryTag()); 
					//收到的消息 
					System.out.println("消费者1-接收到的消息为:" + new String(body, "utf-8")); 
					Thread.sleep(1000); 
					//确认消息 
					channel.basicAck(envelope.getDeliveryTag(), false); 
				} 
				catch (InterruptedException e) { 
					e.printStackTrace(); 
				} 
			} 
		};
		//监听消息 
		/**
		 * 参数1:队列名称
		 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 
		 * 参数3:消息接收到后回调 
		*/ 
		channel.basicConsume(Producer.QUEUE_NAME, false, consumer); 
	} 
}

③消费者2

package com.itheima.rabbitmq.work; 
import com.itheima.rabbitmq.util.ConnectionUtil; 
import com.rabbitmq.client.*; 
import java.io.IOException; 
public class Consumer2 { 
	public static void main(String[] args) throws Exception { 
		Connection connection = ConnectionUtil.getConnection(); 
		// 创建频道 
		Channel channel = connection.createChannel(); 
		// 声明(创建)队列 
		/**
		 * 参数1:队列名称 
		 * 参数2:是否定义持久化队列 
		 * 参数3:是否独占本次连接 
		 * 参数4:是否在不使用的时候自动删除队列 
		 * 参数5:队列其它参数 
		*/ 
		channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null); 
		//一次只能接收并处理一个消息 
		channel.basicQos(1); 
		//创建消费者;并设置消息处理 
		DefaultConsumer consumer = new DefaultConsumer(channel){ 
			@Override 
			/**
			 * consumerTag 消息者标签,在channel.basicConsume时候可以指定 
			 * envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志(收到消息失败后是否需要重新发送) 
			 * properties 属性信息 
			 * body 消息 
			*/ 
			public void handleDelivery(String consumerTag, Envelope envelope, 
					AMQP.BasicProperties properties, byte[] body) throws IOException { 
				try {
					//路由key 
					System.out.println("路由key为:" + envelope.getRoutingKey()); 
					//交换机 
					System.out.println("交换机为:" + envelope.getExchange()); 
					//消息id 
					System.out.println("消息id为:" + envelope.getDeliveryTag());
					//收到的消息 
					System.out.println("消费者2-接收到的消息为:" + new String(body, "utf-8")); 
					Thread.sleep(1000); 
					//确认消息 
					channel.basicAck(envelope.getDeliveryTag(), false); 
				} catch (InterruptedException e) { 
					e.printStackTrace(); 
				} 
			} 
		};
		//监听消息 
		/**
		 * 参数1:队列名称 
		 * 参数2:是否自动确认,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动确认 
		 * 参数3:消息接收到后回调 
		*/ 
		channel.basicConsume(Producer.QUEUE_NAME, false, consumer); 
	} 
}

三、测试

启动两个消费者,然后再启动生产者发送消息;到IDEA的两个消费者对应的控制台查看是否竞争性的接收到消息。

总结

在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。

到此这篇关于如何在RabbitMQ中实现Work queues模式的文章就介绍到这了,希望对你有所帮助,更多相关RabbitMQ内容请搜索脚本之家以前的文章或继续浏览下面的相关文章,希望大家以后多多支持脚本之家!

相关文章

  • Java 如何遍历JsonObject对象

    Java 如何遍历JsonObject对象

    这篇文章主要介绍了Java 如何遍历JsonObject对象?今天小编就为大家分享一篇Java遍历JsonObject对象案例,希望对大家有所帮助吧
    2021-01-01
  • 基于Java实现图片相似度对比的示例代码

    基于Java实现图片相似度对比的示例代码

    很多时候我们需要将两个图片进行对比,确定两个图片的相似度。本文将利用Java和OpenCV库实现图片相似度对比,感兴趣的可以动手尝试一下
    2022-07-07
  • java8快速实现List转map 、分组、过滤等操作

    java8快速实现List转map 、分组、过滤等操作

    这篇文章主要介绍了java8快速实现List转map 、分组、过滤等操作,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2019-09-09
  • Java Lambda表达式详解

    Java Lambda表达式详解

    这篇文章主要介绍了Java Lambda表达式详解,包括了Java Lambda表达式创建线程,Java Lambda表达式的语法,Java lambda遍历List集合,Java lambda过滤String需要的朋友可以参考下
    2023-02-02
  • 简单了解Spring Boot及idea整合jsp过程解析

    简单了解Spring Boot及idea整合jsp过程解析

    这篇文章主要介绍了简单了解Spring Boot及idea整合jsp过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • Java中的阻塞队列BlockingQueue使用详解

    Java中的阻塞队列BlockingQueue使用详解

    这篇文章主要介绍了Java中的阻塞队列BlockingQueue使用详解,阻塞队列是一种线程安全的数据结构,用于在多线程环境下进行数据交换,它提供了一种阻塞的机制,当队列为空时,消费者线程将被阻塞,直到队列中有数据可供消费,需要的朋友可以参考下
    2023-10-10
  • Java+OpenCV调用摄像头实现拍照功能

    Java+OpenCV调用摄像头实现拍照功能

    随着我们对环境、Mat基本使用越来越熟练、Java Swing也逐步熟悉了起来。本文将通过OpenCV驱动摄像头实现识脸和拍照功能,需要的可以参考一下
    2022-03-03
  • Java内存模型JMM详解

    Java内存模型JMM详解

    这篇文章主要介绍了Java内存模型JMM详解,涉及volatile和监视器锁,final字段,内存屏障等相关内容,具有一定参考价值,需要的朋友可以了解下。
    2017-11-11
  • ZooKeeper框架教程Curator分布式锁实现及源码分析

    ZooKeeper框架教程Curator分布式锁实现及源码分析

    本文是ZooKeeper入门系列教程,本篇为大家介绍zookeeper一个优秀的框架Curator,提供了各种分布式协调的服务,Curator中有着更为标准、规范的分布式锁实现
    2022-01-01
  • 聊聊DecimalFormat的用法及各符号的意义

    聊聊DecimalFormat的用法及各符号的意义

    这篇文章主要介绍了DecimalFormat的用法及各符号的意义,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-10-10

最新评论