diff --git a/youlai-lab/pom.xml b/youlai-lab/pom.xml index 6663d6389..2acf29b2c 100644 --- a/youlai-lab/pom.xml +++ b/youlai-lab/pom.xml @@ -50,6 +50,13 @@ + + + org.apache.rocketmq + rocketmq-spring-boot-starter + 2.2.2 + + org.springframework.cloud spring-cloud-starter-loadbalancer diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/BaseConsumer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/BaseConsumer.java new file mode 100644 index 000000000..932650a71 --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/BaseConsumer.java @@ -0,0 +1,27 @@ +package com.youlai.lab.rocketmq.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +/** + * @author zc + * @date 2022-10-27 00:49 + */ +@Component +@RocketMQMessageListener(selectorExpression = "", topic = "base_topic", consumerGroup = "base_group") +@Slf4j +public class BaseConsumer implements RocketMQListener { + + /** + * 测试接收将参数topic定死,实际开发写入到配置文件 + * + * @param message + */ + @Override + public void onMessage(String message) { + log.info("基本信息案例-接受到消息:" + message); + } +} + diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/BatchConsumer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/BatchConsumer.java new file mode 100644 index 000000000..7500ed219 --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/BatchConsumer.java @@ -0,0 +1,26 @@ +package com.youlai.lab.rocketmq.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +/** + * @author zc + * @date 2022-10-27 00:50 + */ +@Component +@RocketMQMessageListener(topic = "batch_topic", consumerGroup = "batch_group") +@Slf4j +public class BatchConsumer implements RocketMQListener { + + /** + * 测试接收将参数topic定死,实际开发写入到配置文件 + * + * @param message + */ + @Override + public void onMessage(String message) { + log.info("批量消息-接受到消息:" + message); + } +} diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/ExConsumer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/ExConsumer.java new file mode 100644 index 000000000..158eb715b --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/ExConsumer.java @@ -0,0 +1,29 @@ +package com.youlai.lab.rocketmq.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +import java.nio.charset.StandardCharsets; + +/** + * @author zc + * @date 2022-10-27 00:50 + */ +@Component +@RocketMQMessageListener(topic = "ex_topic", consumerGroup = "ex_group") +@Slf4j +public class ExConsumer implements RocketMQListener { + + // 测试接收将参数topic定死,实际开发写入到配置文件 + + @Override + public void onMessage(MessageExt message) { + String msg = new String(message.getBody(), StandardCharsets.UTF_8); + log.info("message对象:{}", message); + log.info("接收消息:{}", msg); + } + +} diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/ReplyConsumer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/ReplyConsumer.java new file mode 100644 index 000000000..3cd497ac4 --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/ReplyConsumer.java @@ -0,0 +1,25 @@ +package com.youlai.lab.rocketmq.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQReplyListener; +import org.springframework.stereotype.Component; + +/** + * @author zc + * @date 2022-10-27 00:53 + */ +@Component +@RocketMQMessageListener(topic = "reply_topic", consumerGroup = "reply_group") +@Slf4j +public class ReplyConsumer implements RocketMQReplyListener { + + @Override + public byte[] onMessage(String message) { + log.info("接受到消息:" + message); + + // 返回消息到生成者 + return "返回消息到生产者".getBytes(); + } + +} diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/SQLConsumer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/SQLConsumer.java new file mode 100644 index 000000000..4ee1a9ba3 --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/SQLConsumer.java @@ -0,0 +1,28 @@ +package com.youlai.lab.rocketmq.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.annotation.SelectorType; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +/** + * @author zc + * @date 2022-10-27 00:53 + */ +@Component +@RocketMQMessageListener(selectorType = SelectorType.SQL92, selectorExpression = "a between 0 and 6 or b > 8", topic = "sql_topic", consumerGroup = "sql_group") +@Slf4j +public class SQLConsumer implements RocketMQListener { + + /** + * 测试接收将参数topic定死,实际开发写入到配置文件 + * + * @param message + */ + @Override + public void onMessage(String message) { + log.info("SQL92过滤消息-接受到消息:" + message); + } +} + diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/ScheduleConsumer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/ScheduleConsumer.java new file mode 100644 index 000000000..a00b0abe1 --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/ScheduleConsumer.java @@ -0,0 +1,25 @@ +package com.youlai.lab.rocketmq.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +/** + * @author zc + * @date 2022-10-27 00:53 + */ +@Component +@RocketMQMessageListener(topic = "scheduled_topic", consumerGroup = "scheduled_group") +@Slf4j +public class ScheduleConsumer implements RocketMQListener { + + /** + * 测试接收将参数topic定死,实际开发写入到配置文件 + * @param message + */ + @Override + public void onMessage(String message) { + log.info("延时消息-接受到消息:" + message); + } +} diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/TagConsumer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/TagConsumer.java new file mode 100644 index 000000000..003c88c36 --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/TagConsumer.java @@ -0,0 +1,27 @@ +package com.youlai.lab.rocketmq.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +/** + * @author zc + * @date 2022-10-27 00:54 + */ +@Component +@RocketMQMessageListener(selectorExpression = "TAG-A||TAG-B", topic = "tag_topic", consumerGroup = "tag_group") +@Slf4j +public class TagConsumer implements RocketMQListener { + + /** + * 测试接收将参数topic定死,实际开发写入到配置文件 + * + * @param message + */ + @Override + public void onMessage(String message) { + log.info("标签过滤消息-接受到消息:" + message); + } +} + diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/TestConsumer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/TestConsumer.java new file mode 100644 index 000000000..434db88fe --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/consumer/TestConsumer.java @@ -0,0 +1,27 @@ +package com.youlai.lab.rocketmq.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.stereotype.Component; + +/** + * @author zc + * @date 2022-10-27 00:54 + */ +@Component +@RocketMQMessageListener(topic = "test_topic", consumerGroup = "test_group") +@Slf4j +public class TestConsumer implements RocketMQListener { + + /** + * 测试接收将参数topic定死,实际开发写入到配置文件 + * + * @param message + */ + @Override + public void onMessage(String message) { + log.info("TestConsumer - 接受到消息:" + message); + } +} + diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/controller/TestController.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/controller/TestController.java new file mode 100644 index 000000000..42b0cac4e --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/controller/TestController.java @@ -0,0 +1,179 @@ +package com.youlai.lab.rocketmq.controller; + +import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport; +import com.youlai.lab.rocketmq.producer.*; +import com.youlai.lab.rocketmq.producer.tx.TxProducer; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import javax.annotation.Resource; + +/** + * @author zc + * @date 2022-10-27 01:01 + */ + +@Api(tags = "测试接口") +@RestController +@RequestMapping("test") +public class TestController { + + /** + * 搭建测试流程生成者 + */ + @Resource + private TestProducer testProducer; + + /** + * 基本信息案例 + */ + @Resource + private BaseProducer baseProducer; + + /** + * 顺序消息发送样例 + */ + @Resource + private OrderProducer orderProducer; + + /** + * 延时消息 + */ + @Resource + private ScheduledProducer scheduledProducer; + + /** + * 指标标签 + */ + @Resource + private TagProducer tagProducer; + + /** + * 过滤消息 + */ + @Resource + private SQLProducer SQLProducer; + + /** + * 消息事务 + */ + @Resource + private TxProducer txProducer; + + /** + * 消息额外属性测试 + */ + @Resource + private ExProducer exProducer; + + /** + * 回馈消息样例 + */ + @Resource + private ReplyProducer replyProducer; + + /** + * 批量消息发送 + */ + @Resource + private BatchProducer batchProducer; + + @ApiOperation(value = "测试消息生成及发送", notes = "测试消息生成及发送") + @ApiOperationSupport(order = 5) + @GetMapping + public Object test() { + testProducer.send(); + return "测试消息生成及发送"; + } + + + @ApiOperation(value = "基本消息样例", notes = "基本消息样例") + @ApiOperationSupport(order = 10) + @GetMapping("/base") + public Object base() { + // 同步发送 + baseProducer.sync(); + // 异步发送 + baseProducer.async(); + // 单向发送 + baseProducer.oneWay(); + return "基本消息样例"; + } + + @ApiOperation(value = "发送顺序消息", notes = "发送顺序消息") + @ApiOperationSupport(order = 15) + @GetMapping("/order") + public Object order() { + orderProducer.order(); + return "发送顺序消息"; + } + + + @ApiOperation(value = "发送延时消息", notes = "发送延时消息") + @ApiOperationSupport(order = 20) + @GetMapping("/scheduled") + public Object scheduled() { + scheduledProducer.scheduled(); + return "发送延时消息"; + } + + @ApiOperation(value = "标签过滤消息样例", notes = "标签过滤消息样例") + @ApiOperationSupport(order = 25) + @GetMapping("/tag") + public Object tag() { + // TAG过滤 + tagProducer.tag(); + return "指定标签消息"; + } + + @ApiOperation(value = "SQL92过滤消息样例", notes = "SQL92过滤消息样例") + @ApiOperationSupport(order = 30) + @GetMapping("/selector") + public Object selector() { + // SQL92过滤 + SQLProducer.selector(); + return "过滤消息样例"; + } + + @ApiOperation(value = "消息事务样例", notes = "消息事务样例") + @ApiOperationSupport(order = 35) + @GetMapping("/tx") + public Object tx() { + + // 消息事务 + txProducer.tx(); + return "消息事务样例"; + } + + + @ApiOperation(value = "消息额外属性", notes = "消息额外属性") + @ApiOperationSupport(order = 40) + @GetMapping("/ex") + public Object ex() { + // 消息事务 + exProducer.ex(); + return "消息额外属性"; + } + + @ApiOperation(value = "回馈消息样例", notes = "回馈消息样例") + @ApiOperationSupport(order = 40) + @GetMapping("/reply") + public Object reply() { + // 消息事务 + replyProducer.reply(); + return "回馈消息样例"; + } + + @ApiOperation(value = "批量消息样例", notes = "批量消息样例") + @ApiOperationSupport(order = 45) + @GetMapping("/batch") + public Object batch() { + // 批量消息样例 + batchProducer.batch(); + return "批量消息样例"; + } +} + diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/ext/TxRocketMQTemplate.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/ext/TxRocketMQTemplate.java new file mode 100644 index 000000000..2b2c8b484 --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/ext/TxRocketMQTemplate.java @@ -0,0 +1,9 @@ +package com.youlai.lab.rocketmq.ext; + +import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration; +import org.apache.rocketmq.spring.core.RocketMQTemplate; + +@ExtRocketMQTemplateConfiguration +public class TxRocketMQTemplate extends RocketMQTemplate { + +} diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/BaseProducer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/BaseProducer.java new file mode 100644 index 000000000..b385eed13 --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/BaseProducer.java @@ -0,0 +1,74 @@ +package com.youlai.lab.rocketmq.producer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendCallback; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * @author zc + * @date 2022-10-27 00:56 + */ +@Service +@Slf4j +public class BaseProducer { + /** + * 测试发送将参数topic定死,实际开发写入到配置文件 + */ + @Resource + RocketMQTemplate rocketMQTemplate; + + /** + * 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。 + */ + public void sync() { + String text = "基本信息案例-同步发送," + System.currentTimeMillis(); + log.info(text); + + rocketMQTemplate.syncSend("base_topic", text); + log.info("同步发送-已发送..."); + } + + /** + * 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。 + */ + public void async() { + String text = "基本信息案例-异步发送," + System.currentTimeMillis(); + log.info(text); + for (int a = 1; a <= 10; a++) { + int finalA = a; + rocketMQTemplate.asyncSend("base_topic", text + ",ID:" + a, new SendCallback() { + + // SendCallback接收异步返回结果的回调 + // 成功发送 + @Override + public void onSuccess(SendResult sendResult) { + log.info("异步发送 - 发送成功,ID:" + finalA); + } + + // 发送失败 + @Override + public void onException(Throwable throwable) { + log.info("异步发送 - 发送失败"); + throwable.printStackTrace(); + } + }); + } + log.info("异步发送-已发送..."); + } + + /** + * 这种方式主要用在不特别关心发送结果的场景,例如日志发送。 + */ + public void oneWay() { + String text = "基本信息案例-单向发送" + System.currentTimeMillis(); + log.info(text); + + rocketMQTemplate.sendOneWay("base_topic", text); + log.info("单向发送-已发送..."); + } + +} diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/BatchProducer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/BatchProducer.java new file mode 100644 index 000000000..4d016c756 --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/BatchProducer.java @@ -0,0 +1,53 @@ +package com.youlai.lab.rocketmq.producer; + +/** + * @author zc + * @date 2022-10-27 00:57 + */ + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.ArrayList; +import java.util.List; +@Service +@Slf4j +public class BatchProducer { + /** + * 测试发送将参数topic定死,实际开发写入到配置文件 + */ + @Resource + RocketMQTemplate rocketMQTemplate; + + public void batch() { + String text = "批量消息"; + log.info(text); + + List messageList = new ArrayList<>(); + for (int i = 1; i <= 10; i++) { + messageList.add(MessageBuilder.withPayload(text + "--" + i).build()); + } + log.info("开始发送..."); + + //把大的消息分裂成若干个小的消息 + MessageSplitter splitter = new MessageSplitter(messageList); + + while (splitter.hasNext()) { + List nextList = splitter.next(); + SendResult result = rocketMQTemplate.syncSend("batch_topic", nextList); + if (result.getSendStatus() == SendStatus.SEND_OK) { + log.info("发送批量消息成功!消息ID为:{}", result.getMsgId()); + } else { + log.info("发送批量消息失败!消息ID为:{},消息状态为:{}", result.getMsgId(), result.getSendStatus()); + } + } + log.info("已发送..."); + } +} + diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/ExProducer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/ExProducer.java new file mode 100644 index 000000000..a261a75a0 --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/ExProducer.java @@ -0,0 +1,32 @@ +package com.youlai.lab.rocketmq.producer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * @author zc + * @date 2022-10-27 00:58 + */ +@Service +@Slf4j +public class ExProducer { + /** + * 测试发送将参数topic定死,实际开发写入到配置文件 + */ + @Resource + RocketMQTemplate rocketMQTemplate; + + public void ex() { + String text = "消息额外属性测试" + System.currentTimeMillis(); + log.info(text); + + rocketMQTemplate.syncSend("ex_topic", text); + + log.info("已发送..."); + + } +} + diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/MessageSplitter.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/MessageSplitter.java new file mode 100644 index 000000000..153c6e68f --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/MessageSplitter.java @@ -0,0 +1,62 @@ +package com.youlai.lab.rocketmq.producer; + +import org.springframework.messaging.Message; + +import java.util.Iterator; +import java.util.List; + +/** + * @author zc + * @date 2022-10-27 00:59 + */ +public class MessageSplitter implements Iterator> { + + /** + * 分割数据大小 + */ + private final int sizeLimit = 1024 * 1024; + ; + + /** + * 分割数据列表 + */ + private final List messages; + + /** + * 分割索引 + */ + private int currIndex; + + public MessageSplitter(List messages) { + this.messages = messages; + // 保证单条数据的大小不大于sizeLimit + messages.forEach(m -> { + if (m.toString().length() > sizeLimit) { + throw new RuntimeException("单挑消息不能大于" + sizeLimit + "B"); + } + }); + } + + + @Override + public boolean hasNext() { + return currIndex < messages.size(); + } + + @Override + public List next() { + int nextIndex = currIndex; + int totalSize = 0; + for (; nextIndex < messages.size(); nextIndex++) { + Message t = messages.get(nextIndex); + totalSize = totalSize + t.toString().length(); + if (totalSize > sizeLimit) { + break; + } + } + List subList = messages.subList(currIndex, nextIndex); + currIndex = nextIndex; + return subList; + } +} + diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/OrderProducer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/OrderProducer.java new file mode 100644 index 000000000..c8d1de98b --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/OrderProducer.java @@ -0,0 +1,40 @@ +package com.youlai.lab.rocketmq.producer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.concurrent.TimeUnit; + +/** + * @author zc + * @date 2022-10-27 00:59 + */ +@Service +@Slf4j +public class OrderProducer { + /** + * 测试发送将参数topic定死,实际开发写入到配置文件 + */ + @Resource + RocketMQTemplate rocketMQTemplate; + + public void order() { + log.info("顺序消息"); + try { + for (int i = 1; i <= 10; i++) { + int num = (int) (Math.random() * 10000); + // 设置一个延时,表示同一个消息先后进入到队形中 + TimeUnit.MILLISECONDS.sleep(50); + log.info("顺序消息,ID:" + num); + // 第一个参数为topic,第二个参数为内容,第三个参数为Hash值,不同hash值在不同的队列中 + rocketMQTemplate.syncSendOrderly("order_topic", "顺序消息,ID:" + num, "order"); + } + log.info("已发送..."); + } catch (Exception e) { + e.printStackTrace(); + } + } +} + diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/ReplyProducer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/ReplyProducer.java new file mode 100644 index 000000000..6bcfcb6c5 --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/ReplyProducer.java @@ -0,0 +1,33 @@ +package com.youlai.lab.rocketmq.producer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * @author zc + * @date 2022-10-27 00:59 + */ +@Service +@Slf4j +public class ReplyProducer { + /** + * 测试发送将参数topic定死,实际开发写入到配置文件 + */ + @Resource + RocketMQTemplate rocketMQTemplate; + + public void reply() { + + // 如果消费者没有回馈消息,则不会发送下一条消息 + for (int i = 1; i <= 10; i++) { + String text = "回馈消息" + "--" + i; + log.info("发送" + text); + Object obj = rocketMQTemplate.sendAndReceive("reply_topic", text, String.class); + log.info("消费者返回的消息:" + obj); + } + } +} + diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/SQLProducer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/SQLProducer.java new file mode 100644 index 000000000..2669a9bd2 --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/SQLProducer.java @@ -0,0 +1,44 @@ +package com.youlai.lab.rocketmq.producer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.HashMap; +import java.util.Map; + +/** + * @author zc + * @date 2022-10-27 01:00 + */ +@Service +@Slf4j +public class SQLProducer { + + /** + * 测试发送将参数topic定死,实际开发写入到配置文件 + */ + @Resource + RocketMQTemplate rocketMQTemplate; + + /** + * SQL92过滤消息 + */ + public void selector() { + + String text = "SQL92过滤消息" + System.currentTimeMillis(); + log.info(text); + + Message message = MessageBuilder.withPayload(text).build(); + // 设置参数 + Map map = new HashMap<>(4); + map.put("a", 2); + map.put("b", 10); + rocketMQTemplate.convertAndSend("sql_topic", message, map); + log.info("已发送..."); + } +} + diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/ScheduledProducer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/ScheduledProducer.java new file mode 100644 index 000000000..3e2d1feb2 --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/ScheduledProducer.java @@ -0,0 +1,35 @@ +package com.youlai.lab.rocketmq.producer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * @author zc + * @date 2022-10-27 01:00 + */ +@Service +@Slf4j +public class ScheduledProducer { + /** + * 测试发送将参数topic定死,实际开发写入到配置文件 + */ + @Resource + RocketMQTemplate rocketMQTemplate; + + public void scheduled() { + String text = "延时消息"+ System.currentTimeMillis(); + log.info(text); + + // 设置延时等级2,这个消息将在5s之后发送 + // 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h + Message message = MessageBuilder.withPayload(text).build(); + rocketMQTemplate.syncSend("scheduled_topic", message, 1000, 2); + + log.info("已发送..."); + } +} diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/TagProducer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/TagProducer.java new file mode 100644 index 000000000..33cbf439b --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/TagProducer.java @@ -0,0 +1,33 @@ +package com.youlai.lab.rocketmq.producer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * @author zc + * @date 2022-10-27 01:00 + */ +@Service +@Slf4j +public class TagProducer { + /** + * 测试发送将参数topic定死,实际开发写入到配置文件 + */ + @Resource + RocketMQTemplate rocketMQTemplate; + + public void tag() { + String text = "标签过滤消息" + System.currentTimeMillis(); + log.info(text); + + // 任何类型的send方法均可以指定TAG,默认可以不指定则为* + Message message = MessageBuilder.withPayload(text).build(); + rocketMQTemplate.syncSend("tag_topic:TAG-A", message); + log.info("已发送..."); + } +} diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/TestProducer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/TestProducer.java new file mode 100644 index 000000000..a290464a7 --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/TestProducer.java @@ -0,0 +1,32 @@ +package com.youlai.lab.rocketmq.producer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; + +/** + * @author zc + * @date 2022-10-27 01:00 + */ +@Service +@Slf4j +public class TestProducer { + /** + * 测试发送将参数topic定死,实际开发写入到配置文件 + */ + @Resource + private RocketMQTemplate rocketMQTemplate; + + public void send() { + String text = "测试发送"; + Message message = MessageBuilder.withPayload(text).build(); + log.info("开始发送..."); + rocketMQTemplate.send("test_topic", message); + log.info("已发送..."); + } +} + diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/tx/TxProducer.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/tx/TxProducer.java new file mode 100644 index 000000000..ee1b040e6 --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/tx/TxProducer.java @@ -0,0 +1,41 @@ +package com.youlai.lab.rocketmq.producer.tx; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.UUID; + +/** + * @author zc + * @date 2022-10-27 00:56 + */ +@Service +@Slf4j +public class TxProducer { + /** + * 测试发送将参数topic定死,实际开发写入到配置文件 + * 一个RocketMQTemplate只能注册一个事务监听器, + * 如果存在多个事务监听器监听不同的`Producer`, + * 需要通过注解`@ExtRocketMQTemplateConfiguration`定义不同的RocketMQTemplate + */ + @Resource(name = "txRocketMQTemplate") + RocketMQTemplate rocketMQTemplate; + + public void tx() { + String text = "消息事务发送" + System.currentTimeMillis(); + log.info(text); + UUID transactionId = UUID.randomUUID(); + log.info("事务ID:" + transactionId); + Message message = MessageBuilder.withPayload(text) + // 设置事务Id + .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId) + .build(); + rocketMQTemplate.sendMessageInTransaction("tx_topic", message, null); + log.info("已发送..."); + } +} diff --git a/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/tx/TxProducerListener.java b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/tx/TxProducerListener.java new file mode 100644 index 000000000..b6f9c9c33 --- /dev/null +++ b/youlai-lab/src/main/java/com/youlai/lab/rocketmq/producer/tx/TxProducerListener.java @@ -0,0 +1,111 @@ +package com.youlai.lab.rocketmq.producer.tx; + +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; +import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; +import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; +import org.apache.rocketmq.spring.support.RocketMQHeaders; +import org.springframework.messaging.Message; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author zc + * @date 2022-10-27 00:56 + */ +@Slf4j +@RocketMQTransactionListener(rocketMQTemplateBeanName = "txRocketMQTemplate") +public class TxProducerListener implements RocketMQLocalTransactionListener { + + /** + * 记录各个事务Id的状态:1-正在执行,2-执行成功,3-执行失败 + */ + private ConcurrentHashMap transMap = new ConcurrentHashMap<>(); + + /** + * 执行本地事务 + * + * @param msg + * @param arg + * @return + */ + @Override + public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { + // 执行本地事务 + String transId = msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID).toString(); + log.info("消息事务id为:" + transId); + // 状态为正在执行 + transMap.put(transId, 1); + try { + log.info("正在执行本地事务"); + + // 模拟耗时操作估计出发mq回查操作:当RocketMQ长时间(1分钟)没有收到本地事务的返回结果 + // TimeUnit.SECONDS.sleep(80); + + // 模拟业代码执行,比如模拟插入user数据到数据库中,并且失败的情况 + System.out.println(1 / 0); + + log.info("事务执行完成."); + } catch (Exception e) { + // 状态为执行失败 + transMap.put(transId, 3); + log.error("事务执行异常."); + + // 出现异常 + // 如果不需要重试 则设置为:ROLLBACK + // 如果需要检查事务重试,1分钟后发起检查 则设置为:UNKNOWN + return RocketMQLocalTransactionState.UNKNOWN; + } + // 状态为执行成功 + transMap.put(transId, 2); + return RocketMQLocalTransactionState.COMMIT; + } + + + /** + * 事务超时,回查方法 + * 检查本地事务,如果RocketMQ长时间(1分钟左右)没有收到本地事务的返回结果,则会定时主动执行改方法,查询本地事务执行情况。 + * + * @param msg + * @return + */ + @Override + public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { + + //根据transaction的id回查该事务的状态,并返回给消息队列 + //未知状态:查询事务状态,但始终无结果,或者由于网络原因发送不成功,对mq来说都是未知状态,LocalTransactionState.UNKNOW + //正确提交返回LocalTransactionState.COMMIT_MESSAGE + //事务执行失败返回LocalTransactionState.ROLLBACK_MESSAGE + String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); + Integer status = transMap.get(transId); + // 执行状态 1-正在执行,2-执行成功,3-执行失败 + log.info("回查的事务id为:" + transId + ",当前的状态为:" + status); + //正在执行 + if (status == 1) { + log.info("回查结果为:正在执行状态"); + return RocketMQLocalTransactionState.UNKNOWN; + } else if (status == 2) { + //执行成功,返回commit + log.info("回查结果为:成功状态"); + return RocketMQLocalTransactionState.COMMIT; + } else if (status == 3) { + //执行失败,返回rollback + log.info("回查结果为:失败状态"); + // 通过伪代码表示 检查本地事务执行情况 + // User user = selectByUserId(userId); + // if (user!=null) { + // //插入成功(本地事务完成) + // return RocketMQLocalTransactionState.COMMIT; + // } else { + // // 插入失败 + // // 如果不需要再重试 则设置为:ROLLBACK + // // 如果还需要检查事务重试 则设置为:UNKNOWN + // return RocketMQLocalTransactionState.UNKNOWN; + // } + return RocketMQLocalTransactionState.ROLLBACK; + } + + // 其他未知情况,统一返回不重试,删除消息 + return RocketMQLocalTransactionState.ROLLBACK; + } +} diff --git a/youlai-lab/src/test/java/com/youlai/lab/rocketmq/TestProducer.java b/youlai-lab/src/test/java/com/youlai/lab/rocketmq/TestProducer.java new file mode 100644 index 000000000..66e4de2eb --- /dev/null +++ b/youlai-lab/src/test/java/com/youlai/lab/rocketmq/TestProducer.java @@ -0,0 +1,11 @@ +package com.youlai.lab.rocketmq; + +import org.apache.rocketmq.common.message.Message; +import org.springframework.messaging.support.MessageBuilder; + +import javax.annotation.Resource; + +public class TestProducer { + + +}