This commit is contained in:
hxrui 2021-02-04 19:54:53 +08:00
commit fe0d077568
17 changed files with 910 additions and 18 deletions

View File

@ -1,7 +1,7 @@
package com.youlai.common.rabbitmq.config; package com.youlai.common.rabbitmq.config;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
@ -33,16 +33,17 @@ public class RabbitMqConfig {
* @return * @return
*/ */
@Bean @Bean
public MessageConverter messageConverter() { public MessageConverter jackson2MessageConverter() {
return new Jackson2JsonMessageConverter(); return new Jackson2JsonMessageConverter();
} }
/** /**
* 为容器创建号rabbitTemplate注册confirmCallback * 生产者投递消息后如果Broker收到消息后会给生产者一个ACK生产者通过ACK可以确认这条消息是否正常发送到Broker这种方式是消息可靠性投递的核心
* 消息由生产者投递到Broker/Exchange回调 * 步骤1yaml文件中添加配置 spring.rabbitmq.publisher-confirm-type: correlated
* 步骤2编写代码
*/ */
@PostConstruct @PostConstruct
public void setExchangeCallback() { public void setConfirmCallback() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@ -63,27 +64,24 @@ public class RabbitMqConfig {
} }
/** /**
*
* 注意下面两项必须同时配置可以尝试不配置第二项通过测试能够发现当消息路由到Queue失败(比如路由件错误)returnCallback并未被回调 * 注意下面两项必须同时配置可以尝试不配置第二项通过测试能够发现当消息路由到Queue失败(比如路由件错误)returnCallback并未被回调
* # 开启阶段二(消息从E->Q)的确认回调 Exchange --> Queue returnCallback * # 开启阶段二(消息从E->Q)的确认回调 Exchange --> Queue returnCallback
* spring.rabbitmq.publisher-returns=true * spring.rabbitmq.publisher-returns=true
* # 官方文档说此时这一项必须设置为true *
* # 实际上这一项的作用是消息未成功到达队列时能监听到到路由不可达的消息以异步方式优先调用我们自己设置的returnCallback默认情况下这个消息会被直接丢弃无法监听到 * #为true,则交换机处理消息到路由失败则会返回给生产者
* spring.rabbitmq.template.mandatory=true * spring.rabbitmq.template.mandatory=true
*/ */
@PostConstruct @PostConstruct
public void setQueueCallback() { public void setQueueCallback() {
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
/**
* 这个方法的参数并没有像 confirmCallback 那样提供boolean类型的ack因此这个回调只能在失败情况下触发
* @param message 发送消息
* @param replyCode 回复错误码
* @param replyText 回复错误内容
* @param exchange 发送消息时指定的交换机
* @param routingKey 发送消息时使用的路由件
*/
@Override @Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("路由到队列失败,[消息内容:{},交换机:{},路由件:{},回复码:{},回复文本:{}]", message, exchange, routingKey, replyCode, replyText); log.error("路由到队列失败,[消息内容:{},交换机:{},路由件:{},回复码:{},回复文本:{}]",
returnedMessage.getMessage(), returnedMessage.getExchange(),
returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(), returnedMessage.getReplyText());
} }
}); });
} }

View File

@ -0,0 +1,84 @@
package com.youlai.common.rabbitmq.demo.exchangeToExchange;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ headers路由模型
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class Recv {
public static void main(String[] args) throws IOException, TimeoutException {
Send.declareExchanges();
Send.declareExchangesBindings();
Recv.declareQueues();
Recv.declareQueueBindings();
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("MobileQ", true, (consumerTag, message) -> {
System.out.println("\n\n" + message.getEnvelope());
System.out.println("MobileQ:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("ACQ", true, (consumerTag, message) -> {
System.out.println("\n\n" + message.getEnvelope());
System.out.println("ACQ:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("LightQ", true, (consumerTag, message) -> {
System.out.println("\n\n" + message.getEnvelope());
System.out.println("LightQ:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("LaptopQ", true, (consumerTag, message) -> {
System.out.println("\n\n" + message.getEnvelope());
System.out.println("LaptopQ:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("FanQ", true, (consumerTag, message) -> {
System.out.println("\n\n" + message.getEnvelope());
System.out.println("FanQ:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
}
//Declare the Queues
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do not share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//Create the Queues for linked-direct-exchange
channel.queueDeclare("MobileQ", true, false, false, null);
channel.queueDeclare("ACQ", true, false, false, null);
channel.queueDeclare("LightQ", true, false, false, null);
//Create the Queues for home-direct-exchange
channel.queueDeclare("FanQ", true, false, false, null);
channel.queueDeclare("LaptopQ", true, false, false, null);
channel.close();
}
//Create the Bindings between respective Queues and Exchanges
public static void declareQueueBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey)
channel.queueBind("MobileQ", "linked-direct-exchange", "personalDevice");
channel.queueBind("ACQ", "linked-direct-exchange", "homeAppliance");
channel.queueBind("LightQ", "linked-direct-exchange", "homeAppliance");
//Create the bindings - with home-direct-exchange
channel.queueBind("FanQ", "home-direct-exchange", "homeAppliance");
channel.queueBind("LaptopQ", "home-direct-exchange", "personalDevice");
channel.close();
}
}

View File

@ -0,0 +1,46 @@
package com.youlai.common.rabbitmq.demo.exchangeToExchange;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ headers模型
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class Send {
public static void main(String[] args) throws IOException, TimeoutException {
Send.declareExchanges();
Send.declareExchangesBindings();
try (Channel channel = ConnectionManager.getConnection().createChannel()) {
String message = "Direct message - Turn on the Home Appliances ";
channel.basicPublish("home-direct-exchange", "homeAppliance", null, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
//Declare the exchanges
public static void declareExchanges() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Declare both the exchanges - linked-direct-exchange and home-direct-exchange.
channel.exchangeDeclare("linked-direct-exchange", BuiltinExchangeType.DIRECT, true);
channel.exchangeDeclare("home-direct-exchange", BuiltinExchangeType.DIRECT, true);
channel.close();
}
//Create the Bindings between Exchanges.
public static void declareExchangesBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
// (destination-exchange, source-exchange , routingKey
channel.exchangeBind("linked-direct-exchange", "home-direct-exchange", "homeAppliance");
channel.close();
}
}

View File

@ -0,0 +1,87 @@
package com.youlai.common.rabbitmq.demo.headers;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ headers路由模型
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class HeadersRecv {
public static void main(String[] args) throws IOException, TimeoutException {
HeadersSend.declareExchange();
HeadersRecv.declareQueues();
HeadersRecv.declareBindings();
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("HealthQueue", true, ((consumerTag, message) -> {
System.out.println("\n\n=========== Health Queue ==========");
System.out.println(consumerTag);
System.out.println("HealthQueue: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("SportsQueue", true, ((consumerTag, message) -> {
System.out.println("\n\n ============ Sports Queue ==========");
System.out.println(consumerTag);
System.out.println("SportsQueue: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("EducationQueue", true, ((consumerTag, message) -> {
System.out.println("\n\n ============ Education Queue ==========");
System.out.println(consumerTag);
System.out.println("EducationQueue: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}), consumerTag -> {
System.out.println(consumerTag);
});
}
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do no't share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//queueDeclare - (queueName, durable, exclusive, autoDelete, arguments)
channel.queueDeclare("HealthQueue", true, false, false, null);
channel.queueDeclare("SportsQueue", true, false, false, null);
channel.queueDeclare("EducationQueue", true, false, false, null);
channel.close();
}
public static void declareBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey, headers) - routingKey != null
Map<String, Object> healthArgs = new HashMap<>();
healthArgs.put("x-match", "any"); //Match any of the header
healthArgs.put("h1", "Header1");
healthArgs.put("h2", "Header2");
channel.queueBind("HealthQ", "my-header-exchange", "", healthArgs);
Map<String, Object> sportsArgs = new HashMap<>();
sportsArgs.put("x-match", "all"); //Match all of the header
sportsArgs.put("h1", "Header1");
sportsArgs.put("h2", "Header2");
channel.queueBind("SportsQ", "my-header-exchange", "", sportsArgs);
Map<String, Object> educationArgs = new HashMap<>();
educationArgs.put("x-match", "any"); //Match any of the header
educationArgs.put("h1", "Header1");
educationArgs.put("h2", "Header2");
channel.queueBind("EducationQ", "my-header-exchange", "", educationArgs);
channel.close();
}
}

View File

@ -0,0 +1,50 @@
package com.youlai.common.rabbitmq.demo.headers;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ headers模型
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class HeadersSend {
public static void main(String[] args) throws IOException, TimeoutException {
HeadersSend.declareExchange();
try (Channel channel = ConnectionManager.getConnection().createChannel()) {
String message = "Header Exchange example 1";
Map<String, Object> headerMap = new HashMap<>();
headerMap.put("h1", "Header1");
headerMap.put("h3", "Header3");
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder().headers(headerMap).build();
channel.basicPublish("my-header-exchange", "", properties, message.getBytes());
message = "Header Exchange example 2";
headerMap.put("h2", "Header2");
properties = new AMQP.BasicProperties()
.builder().headers(headerMap).build();
channel.basicPublish("my-header-exchange", "", properties, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
public static void declareExchange() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Declare my-fanout-exchange
channel.exchangeDeclare("my-header-exchange", BuiltinExchangeType.HEADERS, true);
channel.close();
}
}

View File

@ -0,0 +1,33 @@
package com.youlai.common.rabbitmq.demo.manager;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc 创建 Connection 连接工厂对象创建连接
* @email huawei_code@163.com
* @date 2021/2/2
*/
public class ConnectionManager {
private static Connection connection = null;
public static Connection getConnection() {
if (connection == null) {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.10");
factory.setPort(5672);
factory.setVirtualHost("/");
connection = factory.newConnection();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
return connection;
}
}

View File

@ -0,0 +1,56 @@
package com.youlai.common.rabbitmq.demo.pubSub;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ 发布订阅模型实战
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class PubSubRecv {
public static void main(String[] args) throws IOException, TimeoutException {
PubSubSend.declareExchange();
PubSubRecv.declareQueues();
PubSubRecv.declareBindings();
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("WebClientQueue", true, (consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("WebClientQueue:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("AppClientQueue", true, (consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("AppClientQueue:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
}
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do no't share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//queueDeclare - (queueName, durable, exclusive, autoDelete, arguments)
channel.queueDeclare("WebClientQueue", true, false, false, null);
channel.queueDeclare("AppClientQueue", true, false, false, null);
channel.close();
}
public static void declareBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey)
channel.queueBind("WebClientQueue", "my-fanout-exchange", "");
channel.queueBind("AppClientQueue", "my-fanout-exchange", "");
channel.close();
}
}

View File

@ -0,0 +1,42 @@
package com.youlai.common.rabbitmq.demo.pubSub;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ 发布订阅模型
* 简单说明
* 1之前工作模型中我们虽然由多个消费者但是每个消息只能被一个消费者消费在发布订阅模型中只要订阅了这个消息的消费者都可以接收到消息
* 2正规的 RabbitMQ 使用方式消息生产者先将消息发送到 Exchange 交换机中在根据一定的策略将消息投递到队列中消息生产者甚至不用知道队列的存在
* 3Exchange 交换机需要做两件事第一接收来自生产者发送的消息第二将消息投递到队列中
* 4Exchange 交换机必须知道如何正确的将消息投递到队列中Direct exchangeFanout exchangeTopic exchangeHeaders exchange
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class PubSubSend {
public static void main(String[] args) {
try (Channel channel = ConnectionManager.getConnection().createChannel()) {
PubSubSend.declareExchange();
String message = "Hello world !";
channel.basicPublish("my-fanout-exchange", "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println("Send message " + message);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void declareExchange() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Declare my-fanout-exchange
channel.exchangeDeclare("my-fanout-exchange", BuiltinExchangeType.FANOUT, true);
channel.close();
}
}

View File

@ -0,0 +1,61 @@
package com.youlai.common.rabbitmq.demo.routing;
import com.rabbitmq.client.*;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ 路由模型实战
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class RoutingRecv {
public static void main(String[] args) throws IOException, TimeoutException {
// 1创建队列并建立队列与路由器绑定关系
RoutingRecv.declareQueues();
RoutingRecv.declareBindings();
// 2消费消息查看消费结果
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("WebClientQueue", true, (consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("WebClientQueue:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("AppClientQueue", true, (consumerTag, message) -> {
System.out.println(consumerTag);
System.out.println("AppClientQueue:" + new String(message.getBody()));
}, consumerTag -> {
System.out.println(consumerTag);
});
}
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do no't share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//queueDeclare - (queueName, durable, exclusive, autoDelete, arguments)
channel.queueDeclare("WebClientQueue", true, false, false, null);
channel.queueDeclare("AppClientQueue", true, false, false, null);
channel.close();
}
public static void declareBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey)
channel.queueBind("WebClientQueue", "my_exchange_direct", "info");
channel.queueBind("WebClientQueue", "my_exchange_direct", "error");
channel.queueBind("WebClientQueue", "my_exchange_direct", "warning");
channel.queueBind("AppClientQueue", "my_exchange_direct", "error");
channel.close();
}
}

View File

@ -0,0 +1,40 @@
package com.youlai.common.rabbitmq.demo.routing;
import cn.hutool.core.util.CharsetUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
/**
* @author huawei
* @desc RabbitMQ 路由模型
* 简单说明
* 1路由模型需要指定交换机类型为 direct交换机和队列之间通过路由键绑定交换机只会把消息推送到符合路由键的队列中
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class RoutingSend {
private final static String EXCHANGE_NAME = "my_exchange_direct";
public static void main(String[] args) {
// 1创建 rabbitmq 连接和信道
try (Channel channel = ConnectionManager.getConnection().createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, false, null);
String message = "Direct exchange这条消息路由键是info";
channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes(CharsetUtil.UTF_8));
message = "Direct exchange这条消息路由键是error";
channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes(CharsetUtil.UTF_8));
message = "Direct exchange这条消息路由键是warning";
channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes(CharsetUtil.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,34 @@
package com.youlai.common.rabbitmq.demo.simple;
import com.rabbitmq.client.*;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ 简单队列实战
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class SimpleRecv {
public static void main(String[] args) throws IOException, TimeoutException {
// 2创建连接建立信道
Channel channel = ConnectionManager.getConnection().createChannel();
// 3指定要消费的队列注意这里的配置必须与消息发送方配置的一直否则无法消费
channel.queueDeclare("hello", true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 4接收处理消息并自动确认
channel.basicConsume("hello", true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}, consumerTag -> {
System.out.println(consumerTag);
});
}
}

View File

@ -0,0 +1,42 @@
package com.youlai.common.rabbitmq.demo.simple;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.nio.charset.StandardCharsets;
/**
* @author huawei
* @desc RabbitMQ 简单队列实战
* @email huawei_code@163.com
* @date 2021/1/29
*/
public class SimpleSend {
public static void main(String[] args) {
try (Channel channel = ConnectionManager.getConnection().createChannel();){
/**
* queue: 队列名称
* durable: 消息是否持久化
* exclusive: 消息是否排他
* autoDelete: 是否自动删除队列
* arguments: 其他参数例如死信队列等信息
*/
channel.queueDeclare("hello", true, false, false, null);
String message = "hello!";
/**
* 参数String exchange, String routingKey, BasicProperties props, byte[] body
* exchange: 交换机名称不写则是默认的交换机那路由健需要和队列名称一样才可以被路由
* routingKey: 路由键
* props: 配置信息
* body: 发送的消息
*/
channel.basicPublish("", "hello", null, message.getBytes(StandardCharsets.UTF_8));
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,71 @@
package com.youlai.common.rabbitmq.demo.topic;
import com.rabbitmq.client.*;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ topic路由模型
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class TopicRecv {
public static void main(String[] args) throws IOException, TimeoutException {
TopicSend.declareExchange();
TopicRecv.declareQueues();
TopicRecv.declareBindings();
Channel channel = ConnectionManager.getConnection().createChannel();
channel.basicConsume("HealthQueue", true, (consumerTag, message) -> {
System.out.println("\n\n=========== Health Queue ==========");
System.out.println(consumerTag);
System.out.println("HealthQueue: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}, consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("SportsQueue", true, (consumerTag, message) -> {
System.out.println("\n\n=========== Sports Queue ==========");
System.out.println(consumerTag);
System.out.println("SportsQueue: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}, consumerTag -> {
System.out.println(consumerTag);
});
channel.basicConsume("EducationQueue", true, (consumerTag, message) -> {
System.out.println("\n\n=========== Education Queue ==========");
System.out.println(consumerTag);
System.out.println("EducationQueue: " + new String(message.getBody()));
System.out.println(message.getEnvelope());
}, consumerTag -> {
System.out.println(consumerTag);
});
}
public static void declareQueues() throws IOException, TimeoutException {
//Create a channel - do no't share the Channel instance
Channel channel = ConnectionManager.getConnection().createChannel();
//queueDeclare - (queueName, durable, exclusive, autoDelete, arguments)
channel.queueDeclare("HealthQueue", true, false, false, null);
channel.queueDeclare("SportsQueue", true, false, false, null);
channel.queueDeclare("EducationQueue", true, false, false, null);
channel.close();
}
public static void declareBindings() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Create bindings - (queue, exchange, routingKey)
channel.queueBind("HealthQueue", "my-topic-exchange", "health.*");
channel.queueBind("SportsQueue", "my-topic-exchange", "#.sports.*");
channel.queueBind("EducationQueue", "my-topic-exchange", "#.education");
channel.close();
}
}

View File

@ -0,0 +1,42 @@
package com.youlai.common.rabbitmq.demo.topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.youlai.common.rabbitmq.demo.manager.ConnectionManager;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ 路由模型 - 通配符模式
* 简单说明
* 1路由模型需要指定交换机类型为 direct交换机和队列之间通过路由键绑定交换机只会把消息推送到符合路由键的队列中
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class TopicSend {
public static void main(String[] args) throws IOException, TimeoutException {
TopicSend.declareExchange();
try (Channel channel = ConnectionManager.getConnection().createChannel()) {
String message = "Drink a lot of Water and stay Healthy!";
channel.basicPublish("my-topic-exchange", "health.education", null, message.getBytes());
message = "Learn something new everyday";
channel.basicPublish("my-topic-exchange", "education", null, message.getBytes());
message = "Stay fit in Mind and Body";
channel.basicPublish("my-topic-exchange", "education.health", null, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
public static void declareExchange() throws IOException, TimeoutException {
Channel channel = ConnectionManager.getConnection().createChannel();
//Declare my-fanout-exchange
channel.exchangeDeclare("my-topic-exchange", BuiltinExchangeType.TOPIC, true);
channel.close();
}
}

View File

@ -0,0 +1,67 @@
package com.youlai.common.rabbitmq.demo.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ 工作队列实战
* RabbitMQ 工作队列模型是创建多个消费者共同消费消息每个消息只可以被一个消费者处理
* 默认是轮询策略使用轮询无法根据消费者消费速度合理分配消费数量而是平均分配
* 需要限制消费者每次消费消息的数量每次消费完了才可以进行下一次消费
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class ConsumerReceive1 {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 1创建 rabbitmq 连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.10");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2创建连接建立信道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3指定要消费的队列注意这里的配置必须与消息发送方配置的一直否则无法消费
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 模拟轮询策略请将这部分注释
// 4限制消费者每次消费消息数量每次消息处理完成后才能消费下一条消息
int fetchCount = 1;
channel.basicQos(fetchCount);
// 4创建回调消费消息并手动确认收到消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
//模拟消费缓慢
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("--- Consumer 1 Received Message: '" + message + "'");
//手工确认消息消费不是多条确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
//关闭自动确认消息
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}

View File

@ -0,0 +1,80 @@
package com.youlai.common.rabbitmq.demo.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author huawei
* @desc RabbitMQ 工作队列实战
* RabbitMQ 工作队列模型是创建多个消费者共同消费消息每个消息只可以被一个消费者处理
* 默认是轮询策略
*
* 简单队列模型和工作队列模型对比
* 区别
* 1简单队列模型是一个消费者和一个生产者
* 2工作队列模型由一个生产者生产消息多个消费者共同消费消息但是每个消息只可以被一个消费者处理
*
* 共同点两种队列模型都是直接将消息发送到Queue队列中并没有Exchange交换机参与
* @email huawei_code@163.com
* @date 2021/1/30
*/
public class WorkerRecv2 {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 1创建 rabbitmq 连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.10");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2创建连接建立信道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 3指定要消费的队列注意这里的配置必须与消息发送方配置的一直否则无法消费
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 模拟轮询策略请将这部分注释
// 4限制消费者每次消费消息数量每次消息处理完成后才能消费下一条消息
int fetchCount = 1;
channel.basicQos(fetchCount);
// 4创建回调消费消息并手动确认收到消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 5关闭自动确认消息
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}

View File

@ -0,0 +1,59 @@
package com.youlai.common.rabbitmq.demo.work;
import cn.hutool.core.util.CharsetUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author huawei
* @desc RabbitMQ 工作队列实战
* <p>
* 消息生产能力大于消费能力增加多几个消费节点
* 和简单队列类似增加多个几个消费节点处于竞争关系
* 默认策略round robin 轮训
* @email huawei_code@163.com
* @date 2021/1/29
*/
public class WorkerSend {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) {
// 1创建 rabbitmq 连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.10");
factory.setPort(5672);
factory.setVirtualHost("/");
// 2创建 rabbitmq 连接和信道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel();) {
/**
* 参数String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* queue: 队列名称
* durable: 消息是否持久化
* exclusive: 消息是否排他
* autoDelete: 是否自动删除队列
* arguments: 其他参数例如死信队列等信息
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 10; i++) {
/**
* 参数String exchange, String routingKey, BasicProperties props, byte[] body
* exchange: 交换机名称不写则是默认的交换机那路由健需要和队列名称一样才可以被路由
* routingKey: 路由键
* props: 配置信息
* body: 发送的消息
*/
String message = "Hello world " + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(CharsetUtil.UTF_8));
System.out.println("--- Send Message: " + message);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}