From 957580ac241599f610fe0b666367ea5ea17a09c0 Mon Sep 17 00:00:00 2001 From: dongtiandexue Date: Thu, 4 Feb 2021 19:23:24 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E6=B7=BB=E5=8A=A0rabbitmq=E6=94=AF?= =?UTF-8?q?=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../rabbitmq/config/RabbitMqConfig.java | 34 ++++---- .../demo/exchangeToExchange/Recv.java | 84 ++++++++++++++++++ .../demo/exchangeToExchange/Send.java | 46 ++++++++++ .../rabbitmq/demo/headers/HeadersRecv.java | 87 +++++++++++++++++++ .../rabbitmq/demo/headers/HeadersSend.java | 50 +++++++++++ .../demo/manager/ConnectionManager.java | 33 +++++++ .../rabbitmq/demo/pubSub/PubSubRecv.java | 56 ++++++++++++ .../rabbitmq/demo/pubSub/PubSubSend.java | 42 +++++++++ .../rabbitmq/demo/routing/RoutingRecv.java | 61 +++++++++++++ .../rabbitmq/demo/routing/RoutingSend.java | 40 +++++++++ .../rabbitmq/demo/simple/SimpleRecv.java | 34 ++++++++ .../rabbitmq/demo/simple/SimpleSend.java | 42 +++++++++ .../common/rabbitmq/demo/topic/TopicRecv.java | 71 +++++++++++++++ .../common/rabbitmq/demo/topic/TopicSend.java | 42 +++++++++ .../rabbitmq/demo/work/ConsumerReceive1.java | 67 ++++++++++++++ .../rabbitmq/demo/work/WorkerRecv2.java | 80 +++++++++++++++++ .../common/rabbitmq/demo/work/WorkerSend.java | 59 +++++++++++++ 17 files changed, 910 insertions(+), 18 deletions(-) create mode 100644 youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/exchangeToExchange/Recv.java create mode 100644 youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/exchangeToExchange/Send.java create mode 100644 youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/headers/HeadersRecv.java create mode 100644 youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/headers/HeadersSend.java create mode 100644 youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/manager/ConnectionManager.java create mode 100644 youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/pubSub/PubSubRecv.java create mode 100644 youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/pubSub/PubSubSend.java create mode 100644 youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/routing/RoutingRecv.java create mode 100644 youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/routing/RoutingSend.java create mode 100644 youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/simple/SimpleRecv.java create mode 100644 youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/simple/SimpleSend.java create mode 100644 youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/topic/TopicRecv.java create mode 100644 youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/topic/TopicSend.java create mode 100644 youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/work/ConsumerReceive1.java create mode 100644 youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/work/WorkerRecv2.java create mode 100644 youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/work/WorkerSend.java diff --git a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/config/RabbitMqConfig.java b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/config/RabbitMqConfig.java index 009eb9de5..f877cbf29 100644 --- a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/config/RabbitMqConfig.java +++ b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/config/RabbitMqConfig.java @@ -1,7 +1,7 @@ package com.youlai.common.rabbitmq.config; import lombok.extern.slf4j.Slf4j; -import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; @@ -33,16 +33,17 @@ public class RabbitMqConfig { * @return */ @Bean - public MessageConverter messageConverter() { + public MessageConverter jackson2MessageConverter() { return new Jackson2JsonMessageConverter(); } /** - * 为容器创建号rabbitTemplate注册confirmCallback - * 消息由生产者投递到Broker/Exchange回调 + * 生产者投递消息后,如果Broker收到消息后,会给生产者一个ACK。生产者通过ACK,可以确认这条消息是否正常发送到Broker,这种方式是消息可靠性投递的核心 + * 步骤1:yaml文件中添加配置 spring.rabbitmq.publisher-confirm-type: correlated + * 步骤2:编写代码 */ @PostConstruct - public void setExchangeCallback() { + public void setConfirmCallback() { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @@ -63,27 +64,24 @@ public class RabbitMqConfig { } /** + * * 注意下面两项必须同时配置,可以尝试不配置第二项,通过测试能够发现当消息路由到Queue失败(比如路由件错误)时,returnCallback并未被回调。 * # 开启阶段二(消息从E->Q)的确认回调 Exchange --> Queue returnCallback * spring.rabbitmq.publisher-returns=true - * # 官方文档说此时这一项必须设置为true - * # 实际上这一项的作用是:消息【未成功到达】队列时,能监听到到路由不可达的消息,以异步方式优先调用我们自己设置的returnCallback,默认情况下,这个消息会被直接丢弃,无法监听到 + * + * #为true,则交换机处理消息到路由失败,则会返回给生产者 * spring.rabbitmq.template.mandatory=true */ @PostConstruct public void setQueueCallback() { - rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { - /** - * 这个方法的参数并没有像 confirmCallback 那样提供boolean类型的ack,因此这个回调只能在【失败】情况下触发 - * @param message 发送消息 - * @param replyCode 回复错误码 - * @param replyText 回复错误内容 - * @param exchange 发送消息时指定的交换机 - * @param routingKey 发送消息时使用的路由件 - */ + rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { + @Override - public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { - log.error("路由到队列失败,[消息内容:{},交换机:{},路由件:{},回复码:{},回复文本:{}]", message, exchange, routingKey, replyCode, replyText); + public void returnedMessage(ReturnedMessage returnedMessage) { + log.error("路由到队列失败,[消息内容:{},交换机:{},路由件:{},回复码:{},回复文本:{}]", + returnedMessage.getMessage(), returnedMessage.getExchange(), + returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(), returnedMessage.getReplyText()); + } }); } diff --git a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/exchangeToExchange/Recv.java b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/exchangeToExchange/Recv.java new file mode 100644 index 000000000..b5ad6e14f --- /dev/null +++ b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/exchangeToExchange/Recv.java @@ -0,0 +1,84 @@ +package com.youlai.common.rabbitmq.demo.exchangeToExchange; + +import com.rabbitmq.client.Channel; +import com.youlai.common.rabbitmq.demo.manager.ConnectionManager; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +/** + * @author huawei + * @desc RabbitMQ headers路由模型 + * @email huawei_code@163.com + * @date 2021/1/30 + */ +public class Recv { + + public static void main(String[] args) throws IOException, TimeoutException { + Send.declareExchanges(); + Send.declareExchangesBindings(); + Recv.declareQueues(); + Recv.declareQueueBindings(); + + Channel channel = ConnectionManager.getConnection().createChannel(); + + channel.basicConsume("MobileQ", true, (consumerTag, message) -> { + System.out.println("\n\n" + message.getEnvelope()); + System.out.println("MobileQ:" + new String(message.getBody())); + }, consumerTag -> { + System.out.println(consumerTag); + }); + channel.basicConsume("ACQ", true, (consumerTag, message) -> { + System.out.println("\n\n" + message.getEnvelope()); + System.out.println("ACQ:" + new String(message.getBody())); + }, consumerTag -> { + System.out.println(consumerTag); + }); + channel.basicConsume("LightQ", true, (consumerTag, message) -> { + System.out.println("\n\n" + message.getEnvelope()); + System.out.println("LightQ:" + new String(message.getBody())); + }, consumerTag -> { + System.out.println(consumerTag); + }); + + channel.basicConsume("LaptopQ", true, (consumerTag, message) -> { + System.out.println("\n\n" + message.getEnvelope()); + System.out.println("LaptopQ:" + new String(message.getBody())); + }, consumerTag -> { + System.out.println(consumerTag); + }); + channel.basicConsume("FanQ", true, (consumerTag, message) -> { + System.out.println("\n\n" + message.getEnvelope()); + System.out.println("FanQ:" + new String(message.getBody())); + }, consumerTag -> { + System.out.println(consumerTag); + }); + + } + + //Declare the Queues + public static void declareQueues() throws IOException, TimeoutException { + //Create a channel - do not share the Channel instance + Channel channel = ConnectionManager.getConnection().createChannel(); + //Create the Queues for linked-direct-exchange + channel.queueDeclare("MobileQ", true, false, false, null); + channel.queueDeclare("ACQ", true, false, false, null); + channel.queueDeclare("LightQ", true, false, false, null); + //Create the Queues for home-direct-exchange + channel.queueDeclare("FanQ", true, false, false, null); + channel.queueDeclare("LaptopQ", true, false, false, null); + channel.close(); + } + //Create the Bindings between respective Queues and Exchanges + public static void declareQueueBindings() throws IOException, TimeoutException { + Channel channel = ConnectionManager.getConnection().createChannel(); + //Create bindings - (queue, exchange, routingKey) + channel.queueBind("MobileQ", "linked-direct-exchange", "personalDevice"); + channel.queueBind("ACQ", "linked-direct-exchange", "homeAppliance"); + channel.queueBind("LightQ", "linked-direct-exchange", "homeAppliance"); + //Create the bindings - with home-direct-exchange + channel.queueBind("FanQ", "home-direct-exchange", "homeAppliance"); + channel.queueBind("LaptopQ", "home-direct-exchange", "personalDevice"); + channel.close(); + } +} diff --git a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/exchangeToExchange/Send.java b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/exchangeToExchange/Send.java new file mode 100644 index 000000000..45f11be55 --- /dev/null +++ b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/exchangeToExchange/Send.java @@ -0,0 +1,46 @@ +package com.youlai.common.rabbitmq.demo.exchangeToExchange; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.youlai.common.rabbitmq.demo.manager.ConnectionManager; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +/** + * @author huawei + * @desc RabbitMQ headers模型 + * @email huawei_code@163.com + * @date 2021/1/30 + */ +public class Send { + + public static void main(String[] args) throws IOException, TimeoutException { + Send.declareExchanges(); + Send.declareExchangesBindings(); + try (Channel channel = ConnectionManager.getConnection().createChannel()) { + String message = "Direct message - Turn on the Home Appliances "; + channel.basicPublish("home-direct-exchange", "homeAppliance", null, message.getBytes()); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + //Declare the exchanges + public static void declareExchanges() throws IOException, TimeoutException { + Channel channel = ConnectionManager.getConnection().createChannel(); + //Declare both the exchanges - linked-direct-exchange and home-direct-exchange. + channel.exchangeDeclare("linked-direct-exchange", BuiltinExchangeType.DIRECT, true); + channel.exchangeDeclare("home-direct-exchange", BuiltinExchangeType.DIRECT, true); + channel.close(); + } + + //Create the Bindings between Exchanges. + public static void declareExchangesBindings() throws IOException, TimeoutException { + Channel channel = ConnectionManager.getConnection().createChannel(); + // (destination-exchange, source-exchange , routingKey + channel.exchangeBind("linked-direct-exchange", "home-direct-exchange", "homeAppliance"); + channel.close(); + } +} diff --git a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/headers/HeadersRecv.java b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/headers/HeadersRecv.java new file mode 100644 index 000000000..a09e094bc --- /dev/null +++ b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/headers/HeadersRecv.java @@ -0,0 +1,87 @@ +package com.youlai.common.rabbitmq.demo.headers; + +import com.rabbitmq.client.Channel; +import com.youlai.common.rabbitmq.demo.manager.ConnectionManager; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +/** + * @author huawei + * @desc RabbitMQ headers路由模型 + * @email huawei_code@163.com + * @date 2021/1/30 + */ +public class HeadersRecv { + + public static void main(String[] args) throws IOException, TimeoutException { + HeadersSend.declareExchange(); + HeadersRecv.declareQueues(); + HeadersRecv.declareBindings(); + + Channel channel = ConnectionManager.getConnection().createChannel(); + + channel.basicConsume("HealthQueue", true, ((consumerTag, message) -> { + System.out.println("\n\n=========== Health Queue =========="); + System.out.println(consumerTag); + System.out.println("HealthQueue: " + new String(message.getBody())); + System.out.println(message.getEnvelope()); + }), consumerTag -> { + System.out.println(consumerTag); + }); + channel.basicConsume("SportsQueue", true, ((consumerTag, message) -> { + System.out.println("\n\n ============ Sports Queue =========="); + System.out.println(consumerTag); + System.out.println("SportsQueue: " + new String(message.getBody())); + System.out.println(message.getEnvelope()); + }), consumerTag -> { + System.out.println(consumerTag); + }); + channel.basicConsume("EducationQueue", true, ((consumerTag, message) -> { + System.out.println("\n\n ============ Education Queue =========="); + System.out.println(consumerTag); + System.out.println("EducationQueue: " + new String(message.getBody())); + System.out.println(message.getEnvelope()); + }), consumerTag -> { + System.out.println(consumerTag); + }); + + } + + public static void declareQueues() throws IOException, TimeoutException { + //Create a channel - do no't share the Channel instance + Channel channel = ConnectionManager.getConnection().createChannel(); + //queueDeclare - (queueName, durable, exclusive, autoDelete, arguments) + channel.queueDeclare("HealthQueue", true, false, false, null); + channel.queueDeclare("SportsQueue", true, false, false, null); + channel.queueDeclare("EducationQueue", true, false, false, null); + + channel.close(); + } + + public static void declareBindings() throws IOException, TimeoutException { + Channel channel = ConnectionManager.getConnection().createChannel(); + //Create bindings - (queue, exchange, routingKey, headers) - routingKey != null + Map healthArgs = new HashMap<>(); + healthArgs.put("x-match", "any"); //Match any of the header + healthArgs.put("h1", "Header1"); + healthArgs.put("h2", "Header2"); + channel.queueBind("HealthQ", "my-header-exchange", "", healthArgs); + + Map sportsArgs = new HashMap<>(); + sportsArgs.put("x-match", "all"); //Match all of the header + sportsArgs.put("h1", "Header1"); + sportsArgs.put("h2", "Header2"); + channel.queueBind("SportsQ", "my-header-exchange", "", sportsArgs); + + Map educationArgs = new HashMap<>(); + educationArgs.put("x-match", "any"); //Match any of the header + educationArgs.put("h1", "Header1"); + educationArgs.put("h2", "Header2"); + channel.queueBind("EducationQ", "my-header-exchange", "", educationArgs); + channel.close(); + + } +} diff --git a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/headers/HeadersSend.java b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/headers/HeadersSend.java new file mode 100644 index 000000000..23c7884f7 --- /dev/null +++ b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/headers/HeadersSend.java @@ -0,0 +1,50 @@ +package com.youlai.common.rabbitmq.demo.headers; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.youlai.common.rabbitmq.demo.manager.ConnectionManager; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +/** + * @author huawei + * @desc RabbitMQ headers模型 + * @email huawei_code@163.com + * @date 2021/1/30 + */ +public class HeadersSend { + + public static void main(String[] args) throws IOException, TimeoutException { + HeadersSend.declareExchange(); + try (Channel channel = ConnectionManager.getConnection().createChannel()) { + + String message = "Header Exchange example 1"; + Map headerMap = new HashMap<>(); + headerMap.put("h1", "Header1"); + headerMap.put("h3", "Header3"); + AMQP.BasicProperties properties = new AMQP.BasicProperties() + .builder().headers(headerMap).build(); + channel.basicPublish("my-header-exchange", "", properties, message.getBytes()); + + message = "Header Exchange example 2"; + headerMap.put("h2", "Header2"); + properties = new AMQP.BasicProperties() + .builder().headers(headerMap).build(); + channel.basicPublish("my-header-exchange", "", properties, message.getBytes()); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void declareExchange() throws IOException, TimeoutException { + Channel channel = ConnectionManager.getConnection().createChannel(); + //Declare my-fanout-exchange + channel.exchangeDeclare("my-header-exchange", BuiltinExchangeType.HEADERS, true); + channel.close(); + } +} diff --git a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/manager/ConnectionManager.java b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/manager/ConnectionManager.java new file mode 100644 index 000000000..8d92256b9 --- /dev/null +++ b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/manager/ConnectionManager.java @@ -0,0 +1,33 @@ +package com.youlai.common.rabbitmq.demo.manager; + +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +/** + * @author huawei + * @desc 创建 Connection 连接工厂对象,创建连接 + * @email huawei_code@163.com + * @date 2021/2/2 + */ +public class ConnectionManager { + + private static Connection connection = null; + + public static Connection getConnection() { + if (connection == null) { + try { + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("192.168.56.10"); + factory.setPort(5672); + factory.setVirtualHost("/"); + connection = factory.newConnection(); + } catch (IOException | TimeoutException e) { + e.printStackTrace(); + } + } + return connection; + } +} diff --git a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/pubSub/PubSubRecv.java b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/pubSub/PubSubRecv.java new file mode 100644 index 000000000..613341cc1 --- /dev/null +++ b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/pubSub/PubSubRecv.java @@ -0,0 +1,56 @@ +package com.youlai.common.rabbitmq.demo.pubSub; + +import com.rabbitmq.client.Channel; +import com.youlai.common.rabbitmq.demo.manager.ConnectionManager; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +/** + * @author huawei + * @desc RabbitMQ 发布订阅模型实战 + * @email huawei_code@163.com + * @date 2021/1/30 + */ +public class PubSubRecv { + + + public static void main(String[] args) throws IOException, TimeoutException { + PubSubSend.declareExchange(); + PubSubRecv.declareQueues(); + PubSubRecv.declareBindings(); + + Channel channel = ConnectionManager.getConnection().createChannel(); + channel.basicConsume("WebClientQueue", true, (consumerTag, message) -> { + System.out.println(consumerTag); + System.out.println("WebClientQueue:" + new String(message.getBody())); + }, consumerTag -> { + System.out.println(consumerTag); + }); + + channel.basicConsume("AppClientQueue", true, (consumerTag, message) -> { + System.out.println(consumerTag); + System.out.println("AppClientQueue:" + new String(message.getBody())); + }, consumerTag -> { + System.out.println(consumerTag); + }); + } + + + public static void declareQueues() throws IOException, TimeoutException { + //Create a channel - do no't share the Channel instance + Channel channel = ConnectionManager.getConnection().createChannel(); + //queueDeclare - (queueName, durable, exclusive, autoDelete, arguments) + channel.queueDeclare("WebClientQueue", true, false, false, null); + channel.queueDeclare("AppClientQueue", true, false, false, null); + channel.close(); + } + + public static void declareBindings() throws IOException, TimeoutException { + Channel channel = ConnectionManager.getConnection().createChannel(); + //Create bindings - (queue, exchange, routingKey) + channel.queueBind("WebClientQueue", "my-fanout-exchange", ""); + channel.queueBind("AppClientQueue", "my-fanout-exchange", ""); + channel.close(); + } +} diff --git a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/pubSub/PubSubSend.java b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/pubSub/PubSubSend.java new file mode 100644 index 000000000..6154a0452 --- /dev/null +++ b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/pubSub/PubSubSend.java @@ -0,0 +1,42 @@ +package com.youlai.common.rabbitmq.demo.pubSub; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.youlai.common.rabbitmq.demo.manager.ConnectionManager; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeoutException; + +/** + * @author huawei + * @desc RabbitMQ 发布订阅模型 + * 简单说明: + * 1、之前工作模型中,我们虽然由多个消费者但是每个消息只能被一个消费者消费;在发布订阅模型中只要订阅了这个消息的消费者都可以接收到消息 + * 2、正规的 RabbitMQ 使用方式,消息生产者先将消息发送到 Exchange 交换机中,在根据一定的策略将消息投递到队列中,消息生产者甚至不用知道队列的存在 + * 3、Exchange 交换机需要做两件事:第一、接收来自生产者发送的消息;第二、将消息投递到队列中 + * 4、Exchange 交换机必须知道如何正确的将消息投递到队列中(Direct exchange、Fanout exchange、Topic exchange、Headers exchange) + * @email huawei_code@163.com + * @date 2021/1/30 + */ +public class PubSubSend { + + public static void main(String[] args) { + + try (Channel channel = ConnectionManager.getConnection().createChannel()) { + PubSubSend.declareExchange(); + String message = "Hello world !"; + channel.basicPublish("my-fanout-exchange", "", null, message.getBytes(StandardCharsets.UTF_8)); + System.out.println("Send message :" + message); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void declareExchange() throws IOException, TimeoutException { + Channel channel = ConnectionManager.getConnection().createChannel(); + //Declare my-fanout-exchange + channel.exchangeDeclare("my-fanout-exchange", BuiltinExchangeType.FANOUT, true); + channel.close(); + } +} diff --git a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/routing/RoutingRecv.java b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/routing/RoutingRecv.java new file mode 100644 index 000000000..e521f8e7c --- /dev/null +++ b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/routing/RoutingRecv.java @@ -0,0 +1,61 @@ +package com.youlai.common.rabbitmq.demo.routing; + +import com.rabbitmq.client.*; +import com.youlai.common.rabbitmq.demo.manager.ConnectionManager; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +/** + * @author huawei + * @desc RabbitMQ 路由模型实战 + * @email huawei_code@163.com + * @date 2021/1/30 + */ +public class RoutingRecv { + + public static void main(String[] args) throws IOException, TimeoutException { + + // 1、创建队列,并建立队列与路由器绑定关系 + RoutingRecv.declareQueues(); + RoutingRecv.declareBindings(); + + // 2、消费消息查看消费结果 + Channel channel = ConnectionManager.getConnection().createChannel(); + channel.basicConsume("WebClientQueue", true, (consumerTag, message) -> { + System.out.println(consumerTag); + System.out.println("WebClientQueue:" + new String(message.getBody())); + }, consumerTag -> { + System.out.println(consumerTag); + }); + + channel.basicConsume("AppClientQueue", true, (consumerTag, message) -> { + System.out.println(consumerTag); + System.out.println("AppClientQueue:" + new String(message.getBody())); + }, consumerTag -> { + System.out.println(consumerTag); + }); + + } + + public static void declareQueues() throws IOException, TimeoutException { + //Create a channel - do no't share the Channel instance + Channel channel = ConnectionManager.getConnection().createChannel(); + //queueDeclare - (queueName, durable, exclusive, autoDelete, arguments) + channel.queueDeclare("WebClientQueue", true, false, false, null); + channel.queueDeclare("AppClientQueue", true, false, false, null); + channel.close(); + } + + public static void declareBindings() throws IOException, TimeoutException { + Channel channel = ConnectionManager.getConnection().createChannel(); + //Create bindings - (queue, exchange, routingKey) + channel.queueBind("WebClientQueue", "my_exchange_direct", "info"); + channel.queueBind("WebClientQueue", "my_exchange_direct", "error"); + channel.queueBind("WebClientQueue", "my_exchange_direct", "warning"); + channel.queueBind("AppClientQueue", "my_exchange_direct", "error"); + channel.close(); + } + + +} diff --git a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/routing/RoutingSend.java b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/routing/RoutingSend.java new file mode 100644 index 000000000..219ae6b1d --- /dev/null +++ b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/routing/RoutingSend.java @@ -0,0 +1,40 @@ +package com.youlai.common.rabbitmq.demo.routing; + +import cn.hutool.core.util.CharsetUtil; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.youlai.common.rabbitmq.demo.manager.ConnectionManager; + +/** + * @author huawei + * @desc RabbitMQ 路由模型 + * 简单说明: + * 1、路由模型需要指定交换机类型为 direct,交换机和队列之间通过路由键绑定,交换机只会把消息推送到符合路由键的队列中 + * @email huawei_code@163.com + * @date 2021/1/30 + */ +public class RoutingSend { + + private final static String EXCHANGE_NAME = "my_exchange_direct"; + + public static void main(String[] args) { + // 1、创建 rabbitmq 连接和信道 + try (Channel channel = ConnectionManager.getConnection().createChannel()) { + + channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, false, null); + String message = "Direct exchange,这条消息路由键是:info"; + channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes(CharsetUtil.UTF_8)); + + message = "Direct exchange,这条消息路由键是:error"; + channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes(CharsetUtil.UTF_8)); + + message = "Direct exchange,这条消息路由键是:warning"; + channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes(CharsetUtil.UTF_8)); + + } catch (Exception e) { + e.printStackTrace(); + } + + } + +} diff --git a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/simple/SimpleRecv.java b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/simple/SimpleRecv.java new file mode 100644 index 000000000..12b868a70 --- /dev/null +++ b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/simple/SimpleRecv.java @@ -0,0 +1,34 @@ +package com.youlai.common.rabbitmq.demo.simple; + +import com.rabbitmq.client.*; +import com.youlai.common.rabbitmq.demo.manager.ConnectionManager; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +/** + * @author huawei + * @desc RabbitMQ 简单队列实战 + * @email huawei_code@163.com + * @date 2021/1/30 + */ +public class SimpleRecv { + + public static void main(String[] args) throws IOException, TimeoutException { + + // 2、创建连接,建立信道 + Channel channel = ConnectionManager.getConnection().createChannel(); + + // 3、指定要消费的队列,注意这里的配置必须与消息发送方配置的一直,否则无法消费 + channel.queueDeclare("hello", true, false, false, null); + System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); + + // 4、接收处理消息并自动确认 + channel.basicConsume("hello", true, (consumerTag, delivery) -> { + String message = new String(delivery.getBody(), "UTF-8"); + System.out.println(" [x] Received '" + message + "'"); + }, consumerTag -> { + System.out.println(consumerTag); + }); + } +} diff --git a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/simple/SimpleSend.java b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/simple/SimpleSend.java new file mode 100644 index 000000000..96b84a856 --- /dev/null +++ b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/simple/SimpleSend.java @@ -0,0 +1,42 @@ +package com.youlai.common.rabbitmq.demo.simple; + +import com.rabbitmq.client.Channel; +import com.youlai.common.rabbitmq.demo.manager.ConnectionManager; + +import java.nio.charset.StandardCharsets; + +/** + * @author huawei + * @desc RabbitMQ 简单队列实战 + * @email huawei_code@163.com + * @date 2021/1/29 + */ +public class SimpleSend { + + public static void main(String[] args) { + + try (Channel channel = ConnectionManager.getConnection().createChannel();){ + /** + * queue: 队列名称 + * durable: 消息是否持久化 + * exclusive: 消息是否排他 + * autoDelete: 是否自动删除队列 + * arguments: 其他参数(例如:死信队列等信息) + */ + channel.queueDeclare("hello", true, false, false, null); + String message = "hello!"; + + /** + * 参数:String exchange, String routingKey, BasicProperties props, byte[] body + * exchange: 交换机名称:不写则是默认的交换机,那路由健需要和队列名称一样才可以被路由 + * routingKey: 路由键 + * props: 配置信息 + * body: 发送的消息 + */ + channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8)); + } catch (Exception e) { + e.printStackTrace(); + } + + } +} diff --git a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/topic/TopicRecv.java b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/topic/TopicRecv.java new file mode 100644 index 000000000..5f8527d48 --- /dev/null +++ b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/topic/TopicRecv.java @@ -0,0 +1,71 @@ +package com.youlai.common.rabbitmq.demo.topic; + +import com.rabbitmq.client.*; +import com.youlai.common.rabbitmq.demo.manager.ConnectionManager; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +/** + * @author huawei + * @desc RabbitMQ topic路由模型 + * @email huawei_code@163.com + * @date 2021/1/30 + */ +public class TopicRecv { + + public static void main(String[] args) throws IOException, TimeoutException { + TopicSend.declareExchange(); + TopicRecv.declareQueues(); + TopicRecv.declareBindings(); + + Channel channel = ConnectionManager.getConnection().createChannel(); + channel.basicConsume("HealthQueue", true, (consumerTag, message) -> { + System.out.println("\n\n=========== Health Queue =========="); + System.out.println(consumerTag); + System.out.println("HealthQueue: " + new String(message.getBody())); + System.out.println(message.getEnvelope()); + + }, consumerTag -> { + System.out.println(consumerTag); + }); + + channel.basicConsume("SportsQueue", true, (consumerTag, message) -> { + System.out.println("\n\n=========== Sports Queue =========="); + System.out.println(consumerTag); + System.out.println("SportsQueue: " + new String(message.getBody())); + System.out.println(message.getEnvelope()); + }, consumerTag -> { + System.out.println(consumerTag); + }); + + channel.basicConsume("EducationQueue", true, (consumerTag, message) -> { + System.out.println("\n\n=========== Education Queue =========="); + System.out.println(consumerTag); + System.out.println("EducationQueue: " + new String(message.getBody())); + System.out.println(message.getEnvelope()); + }, consumerTag -> { + System.out.println(consumerTag); + }); + } + + public static void declareQueues() throws IOException, TimeoutException { + //Create a channel - do no't share the Channel instance + Channel channel = ConnectionManager.getConnection().createChannel(); + //queueDeclare - (queueName, durable, exclusive, autoDelete, arguments) + channel.queueDeclare("HealthQueue", true, false, false, null); + channel.queueDeclare("SportsQueue", true, false, false, null); + channel.queueDeclare("EducationQueue", true, false, false, null); + + channel.close(); + } + + public static void declareBindings() throws IOException, TimeoutException { + Channel channel = ConnectionManager.getConnection().createChannel(); + //Create bindings - (queue, exchange, routingKey) + channel.queueBind("HealthQueue", "my-topic-exchange", "health.*"); + channel.queueBind("SportsQueue", "my-topic-exchange", "#.sports.*"); + channel.queueBind("EducationQueue", "my-topic-exchange", "#.education"); + channel.close(); + } +} diff --git a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/topic/TopicSend.java b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/topic/TopicSend.java new file mode 100644 index 000000000..697b30517 --- /dev/null +++ b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/topic/TopicSend.java @@ -0,0 +1,42 @@ +package com.youlai.common.rabbitmq.demo.topic; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; +import com.youlai.common.rabbitmq.demo.manager.ConnectionManager; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +/** + * @author huawei + * @desc RabbitMQ 路由模型 - 通配符模式 + * 简单说明: + * 1、路由模型需要指定交换机类型为 direct,交换机和队列之间通过路由键绑定,交换机只会把消息推送到符合路由键的队列中 + * @email huawei_code@163.com + * @date 2021/1/30 + */ +public class TopicSend { + + public static void main(String[] args) throws IOException, TimeoutException { + TopicSend.declareExchange(); + try (Channel channel = ConnectionManager.getConnection().createChannel()) { + + String message = "Drink a lot of Water and stay Healthy!"; + channel.basicPublish("my-topic-exchange", "health.education", null, message.getBytes()); + message = "Learn something new everyday"; + channel.basicPublish("my-topic-exchange", "education", null, message.getBytes()); + message = "Stay fit in Mind and Body"; + channel.basicPublish("my-topic-exchange", "education.health", null, message.getBytes()); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void declareExchange() throws IOException, TimeoutException { + Channel channel = ConnectionManager.getConnection().createChannel(); + //Declare my-fanout-exchange + channel.exchangeDeclare("my-topic-exchange", BuiltinExchangeType.TOPIC, true); + channel.close(); + } +} diff --git a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/work/ConsumerReceive1.java b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/work/ConsumerReceive1.java new file mode 100644 index 000000000..0a123c94d --- /dev/null +++ b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/work/ConsumerReceive1.java @@ -0,0 +1,67 @@ +package com.youlai.common.rabbitmq.demo.work; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * @author huawei + * @desc RabbitMQ 工作队列实战 + * RabbitMQ 工作队列模型,是创建多个消费者共同消费消息,每个消息只可以被一个消费者处理 + * 默认是轮询策略:使用轮询无法根据消费者消费速度合理分配消费数量,而是平均分配 + * 需要限制消费者每次消费消息的数量,每次消费完了才可以进行下一次消费 + * @email huawei_code@163.com + * @date 2021/1/30 + */ +public class ConsumerReceive1 { + + private final static String QUEUE_NAME = "work_queue"; + + public static void main(String[] args) throws IOException, TimeoutException { + + // 1、创建 rabbitmq 连接工厂 + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("192.168.56.10"); + factory.setPort(5672); + factory.setVirtualHost("/"); + + // 2、创建连接,建立信道 + Connection connection = factory.newConnection(); + Channel channel = connection.createChannel(); + + // 3、指定要消费的队列,注意这里的配置必须与消息发送方配置的一直,否则无法消费 + channel.queueDeclare(QUEUE_NAME, true, false, false, null); + System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); + + // 模拟轮询策略,请将这部分注释 + // 4、限制消费者每次消费消息数量,每次消息处理完成后才能消费下一条消息 + int fetchCount = 1; + channel.basicQos(fetchCount); + + // 4、创建回调消费消息,并手动确认收到消息 + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + //模拟消费缓慢 + try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + String message = new String(delivery.getBody(), "UTF-8"); + System.out.println("--- Consumer 1 Received Message: '" + message + "'"); + + //手工确认消息消费,不是多条确认 + channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); + }; + + //关闭自动确认消息 + channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); + + + } +} diff --git a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/work/WorkerRecv2.java b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/work/WorkerRecv2.java new file mode 100644 index 000000000..eae8fec65 --- /dev/null +++ b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/work/WorkerRecv2.java @@ -0,0 +1,80 @@ +package com.youlai.common.rabbitmq.demo.work; + +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.DeliverCallback; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +/** + * @author huawei + * @desc RabbitMQ 工作队列实战 + * RabbitMQ 工作队列模型,是创建多个消费者共同消费消息,每个消息只可以被一个消费者处理 + * 默认是轮询策略 + * + * 简单队列模型和工作队列模型对比: + * 区别: + * 1、简单队列模型:是一个消费者和一个生产者 + * 2、工作队列模型:由一个生产者生产消息,多个消费者共同消费消息,但是每个消息只可以被一个消费者处理 + * + * 共同点:两种队列模型:都是直接将消息发送到Queue队列中,并没有Exchange交换机参与 + * @email huawei_code@163.com + * @date 2021/1/30 + */ +public class WorkerRecv2 { + + private final static String QUEUE_NAME = "work_queue"; + + public static void main(String[] args) throws IOException, TimeoutException { + + // 1、创建 rabbitmq 连接工厂 + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("192.168.56.10"); + factory.setPort(5672); + factory.setVirtualHost("/"); + + // 2、创建连接,建立信道 + Connection connection = factory.newConnection(); + Channel channel = connection.createChannel(); + + // 3、指定要消费的队列,注意这里的配置必须与消息发送方配置的一直,否则无法消费 + channel.queueDeclare(QUEUE_NAME, true, false, false, null); + System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); + + // 模拟轮询策略,请将这部分注释 + // 4、限制消费者每次消费消息数量,每次消息处理完成后才能消费下一条消息 + int fetchCount = 1; + channel.basicQos(fetchCount); + + // 4、创建回调消费消息,并手动确认收到消息 + DeliverCallback deliverCallback = (consumerTag, delivery) -> { + + String message = new String(delivery.getBody(), "UTF-8"); + System.out.println(" [x] Received '" + message + "'"); + + try { + doWork(message); + } finally { + System.out.println(" [x] Done"); + channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); + } + }; + + // 5、关闭自动确认消息 + channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); + } + + private static void doWork(String task) { + for (char ch : task.toCharArray()) { + if (ch == '.') { + try { + Thread.sleep(1000); + } catch (InterruptedException _ignored) { + Thread.currentThread().interrupt(); + } + } + } + } +} diff --git a/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/work/WorkerSend.java b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/work/WorkerSend.java new file mode 100644 index 000000000..bbbad6d9e --- /dev/null +++ b/youlai-common/common-rabbitmq/src/main/java/com/youlai/common/rabbitmq/demo/work/WorkerSend.java @@ -0,0 +1,59 @@ +package com.youlai.common.rabbitmq.demo.work; + +import cn.hutool.core.util.CharsetUtil; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; + +/** + * @author huawei + * @desc RabbitMQ 工作队列实战 + *

+ * 消息生产能力大于消费能力,增加多几个消费节点 + * 和简单队列类似,增加多个几个消费节点,处于竞争关系 + * 默认策略:round robin 轮训 + * @email huawei_code@163.com + * @date 2021/1/29 + */ +public class WorkerSend { + + private final static String QUEUE_NAME = "work_queue"; + + public static void main(String[] args) { + // 1、创建 rabbitmq 连接工厂 + ConnectionFactory factory = new ConnectionFactory(); + factory.setHost("192.168.56.10"); + factory.setPort(5672); + factory.setVirtualHost("/"); + + // 2、创建 rabbitmq 连接和信道 + try (Connection connection = factory.newConnection(); + Channel channel = connection.createChannel();) { + + /** + * 参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments + * queue: 队列名称 + * durable: 消息是否持久化 + * exclusive: 消息是否排他 + * autoDelete: 是否自动删除队列 + * arguments: 其他参数(例如:死信队列等信息) + */ + channel.queueDeclare(QUEUE_NAME, true, false, false, null); + + for (int i = 0; i < 10; i++) { + /** + * 参数:String exchange, String routingKey, BasicProperties props, byte[] body + * exchange: 交换机名称:不写则是默认的交换机,那路由健需要和队列名称一样才可以被路由 + * routingKey: 路由键 + * props: 配置信息 + * body: 发送的消息 + */ + String message = "Hello world !" + i; + channel.basicPublish("", QUEUE_NAME, null, message.getBytes(CharsetUtil.UTF_8)); + System.out.println("--- Send Message: " + message); + } + } catch (Exception e) { + e.printStackTrace(); + } + } +}