fix(youlai-lab): 添加rocketmq基础收发消息事例

添加rocketmq基础收发消息事例
This commit is contained in:
chuan 2022-10-27 01:21:57 +08:00
parent 48bf574e2f
commit b87a95b835
24 changed files with 1010 additions and 0 deletions

View File

@ -50,6 +50,13 @@
</exclusions>
</dependency>
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-loadbalancer</artifactId>

View File

@ -0,0 +1,27 @@
package com.youlai.lab.rocketmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @author zc
* @date 2022-10-27 00:49
*/
@Component
@RocketMQMessageListener(selectorExpression = "", topic = "base_topic", consumerGroup = "base_group")
@Slf4j
public class BaseConsumer implements RocketMQListener<String> {
/**
* 测试接收将参数topic定死实际开发写入到配置文件
*
* @param message
*/
@Override
public void onMessage(String message) {
log.info("基本信息案例-接受到消息:" + message);
}
}

View File

@ -0,0 +1,26 @@
package com.youlai.lab.rocketmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @author zc
* @date 2022-10-27 00:50
*/
@Component
@RocketMQMessageListener(topic = "batch_topic", consumerGroup = "batch_group")
@Slf4j
public class BatchConsumer implements RocketMQListener<String> {
/**
* 测试接收将参数topic定死实际开发写入到配置文件
*
* @param message
*/
@Override
public void onMessage(String message) {
log.info("批量消息-接受到消息:" + message);
}
}

View File

@ -0,0 +1,29 @@
package com.youlai.lab.rocketmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
/**
* @author zc
* @date 2022-10-27 00:50
*/
@Component
@RocketMQMessageListener(topic = "ex_topic", consumerGroup = "ex_group")
@Slf4j
public class ExConsumer implements RocketMQListener<MessageExt> {
// 测试接收将参数topic定死实际开发写入到配置文件
@Override
public void onMessage(MessageExt message) {
String msg = new String(message.getBody(), StandardCharsets.UTF_8);
log.info("message对象:{}", message);
log.info("接收消息:{}", msg);
}
}

View File

@ -0,0 +1,25 @@
package com.youlai.lab.rocketmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.springframework.stereotype.Component;
/**
* @author zc
* @date 2022-10-27 00:53
*/
@Component
@RocketMQMessageListener(topic = "reply_topic", consumerGroup = "reply_group")
@Slf4j
public class ReplyConsumer implements RocketMQReplyListener<String, byte[]> {
@Override
public byte[] onMessage(String message) {
log.info("接受到消息:" + message);
// 返回消息到生成者
return "返回消息到生产者".getBytes();
}
}

View File

@ -0,0 +1,28 @@
package com.youlai.lab.rocketmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @author zc
* @date 2022-10-27 00:53
*/
@Component
@RocketMQMessageListener(selectorType = SelectorType.SQL92, selectorExpression = "a between 0 and 6 or b > 8", topic = "sql_topic", consumerGroup = "sql_group")
@Slf4j
public class SQLConsumer implements RocketMQListener<String> {
/**
* 测试接收将参数topic定死实际开发写入到配置文件
*
* @param message
*/
@Override
public void onMessage(String message) {
log.info("SQL92过滤消息-接受到消息:" + message);
}
}

View File

@ -0,0 +1,25 @@
package com.youlai.lab.rocketmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @author zc
* @date 2022-10-27 00:53
*/
@Component
@RocketMQMessageListener(topic = "scheduled_topic", consumerGroup = "scheduled_group")
@Slf4j
public class ScheduleConsumer implements RocketMQListener<String> {
/**
* 测试接收将参数topic定死实际开发写入到配置文件
* @param message
*/
@Override
public void onMessage(String message) {
log.info("延时消息-接受到消息:" + message);
}
}

View File

@ -0,0 +1,27 @@
package com.youlai.lab.rocketmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @author zc
* @date 2022-10-27 00:54
*/
@Component
@RocketMQMessageListener(selectorExpression = "TAG-A||TAG-B", topic = "tag_topic", consumerGroup = "tag_group")
@Slf4j
public class TagConsumer implements RocketMQListener<String> {
/**
* 测试接收将参数topic定死实际开发写入到配置文件
*
* @param message
*/
@Override
public void onMessage(String message) {
log.info("标签过滤消息-接受到消息:" + message);
}
}

View File

@ -0,0 +1,27 @@
package com.youlai.lab.rocketmq.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* @author zc
* @date 2022-10-27 00:54
*/
@Component
@RocketMQMessageListener(topic = "test_topic", consumerGroup = "test_group")
@Slf4j
public class TestConsumer implements RocketMQListener<String> {
/**
* 测试接收将参数topic定死实际开发写入到配置文件
*
* @param message
*/
@Override
public void onMessage(String message) {
log.info("TestConsumer - 接受到消息:" + message);
}
}

View File

@ -0,0 +1,179 @@
package com.youlai.lab.rocketmq.controller;
import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport;
import com.youlai.lab.rocketmq.producer.*;
import com.youlai.lab.rocketmq.producer.tx.TxProducer;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* @author zc
* @date 2022-10-27 01:01
*/
@Api(tags = "测试接口")
@RestController
@RequestMapping("test")
public class TestController {
/**
* 搭建测试流程生成者
*/
@Resource
private TestProducer testProducer;
/**
* 基本信息案例
*/
@Resource
private BaseProducer baseProducer;
/**
* 顺序消息发送样例
*/
@Resource
private OrderProducer orderProducer;
/**
* 延时消息
*/
@Resource
private ScheduledProducer scheduledProducer;
/**
* 指标标签
*/
@Resource
private TagProducer tagProducer;
/**
* 过滤消息
*/
@Resource
private SQLProducer SQLProducer;
/**
* 消息事务
*/
@Resource
private TxProducer txProducer;
/**
* 消息额外属性测试
*/
@Resource
private ExProducer exProducer;
/**
* 回馈消息样例
*/
@Resource
private ReplyProducer replyProducer;
/**
* 批量消息发送
*/
@Resource
private BatchProducer batchProducer;
@ApiOperation(value = "测试消息生成及发送", notes = "测试消息生成及发送")
@ApiOperationSupport(order = 5)
@GetMapping
public Object test() {
testProducer.send();
return "测试消息生成及发送";
}
@ApiOperation(value = "基本消息样例", notes = "基本消息样例")
@ApiOperationSupport(order = 10)
@GetMapping("/base")
public Object base() {
// 同步发送
baseProducer.sync();
// 异步发送
baseProducer.async();
// 单向发送
baseProducer.oneWay();
return "基本消息样例";
}
@ApiOperation(value = "发送顺序消息", notes = "发送顺序消息")
@ApiOperationSupport(order = 15)
@GetMapping("/order")
public Object order() {
orderProducer.order();
return "发送顺序消息";
}
@ApiOperation(value = "发送延时消息", notes = "发送延时消息")
@ApiOperationSupport(order = 20)
@GetMapping("/scheduled")
public Object scheduled() {
scheduledProducer.scheduled();
return "发送延时消息";
}
@ApiOperation(value = "标签过滤消息样例", notes = "标签过滤消息样例")
@ApiOperationSupport(order = 25)
@GetMapping("/tag")
public Object tag() {
// TAG过滤
tagProducer.tag();
return "指定标签消息";
}
@ApiOperation(value = "SQL92过滤消息样例", notes = "SQL92过滤消息样例")
@ApiOperationSupport(order = 30)
@GetMapping("/selector")
public Object selector() {
// SQL92过滤
SQLProducer.selector();
return "过滤消息样例";
}
@ApiOperation(value = "消息事务样例", notes = "消息事务样例")
@ApiOperationSupport(order = 35)
@GetMapping("/tx")
public Object tx() {
// 消息事务
txProducer.tx();
return "消息事务样例";
}
@ApiOperation(value = "消息额外属性", notes = "消息额外属性")
@ApiOperationSupport(order = 40)
@GetMapping("/ex")
public Object ex() {
// 消息事务
exProducer.ex();
return "消息额外属性";
}
@ApiOperation(value = "回馈消息样例", notes = "回馈消息样例")
@ApiOperationSupport(order = 40)
@GetMapping("/reply")
public Object reply() {
// 消息事务
replyProducer.reply();
return "回馈消息样例";
}
@ApiOperation(value = "批量消息样例", notes = "批量消息样例")
@ApiOperationSupport(order = 45)
@GetMapping("/batch")
public Object batch() {
// 批量消息样例
batchProducer.batch();
return "批量消息样例";
}
}

View File

@ -0,0 +1,9 @@
package com.youlai.lab.rocketmq.ext;
import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
@ExtRocketMQTemplateConfiguration
public class TxRocketMQTemplate extends RocketMQTemplate {
}

View File

@ -0,0 +1,74 @@
package com.youlai.lab.rocketmq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @author zc
* @date 2022-10-27 00:56
*/
@Service
@Slf4j
public class BaseProducer {
/**
* 测试发送将参数topic定死实际开发写入到配置文件
*/
@Resource
RocketMQTemplate rocketMQTemplate;
/**
* 这种可靠性同步地发送方式使用的比较广泛比如重要的消息通知短信通知
*/
public void sync() {
String text = "基本信息案例-同步发送," + System.currentTimeMillis();
log.info(text);
rocketMQTemplate.syncSend("base_topic", text);
log.info("同步发送-已发送...");
}
/**
* 异步消息通常用在对响应时间敏感的业务场景即发送端不能容忍长时间地等待Broker的响应
*/
public void async() {
String text = "基本信息案例-异步发送," + System.currentTimeMillis();
log.info(text);
for (int a = 1; a <= 10; a++) {
int finalA = a;
rocketMQTemplate.asyncSend("base_topic", text + "ID:" + a, new SendCallback() {
// SendCallback接收异步返回结果的回调
// 成功发送
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步发送 - 发送成功ID:" + finalA);
}
// 发送失败
@Override
public void onException(Throwable throwable) {
log.info("异步发送 - 发送失败");
throwable.printStackTrace();
}
});
}
log.info("异步发送-已发送...");
}
/**
* 这种方式主要用在不特别关心发送结果的场景例如日志发送
*/
public void oneWay() {
String text = "基本信息案例-单向发送" + System.currentTimeMillis();
log.info(text);
rocketMQTemplate.sendOneWay("base_topic", text);
log.info("单向发送-已发送...");
}
}

View File

@ -0,0 +1,53 @@
package com.youlai.lab.rocketmq.producer;
/**
* @author zc
* @date 2022-10-27 00:57
*/
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
@Service
@Slf4j
public class BatchProducer {
/**
* 测试发送将参数topic定死实际开发写入到配置文件
*/
@Resource
RocketMQTemplate rocketMQTemplate;
public void batch() {
String text = "批量消息";
log.info(text);
List<Message> messageList = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
messageList.add(MessageBuilder.withPayload(text + "--" + i).build());
}
log.info("开始发送...");
//把大的消息分裂成若干个小的消息
MessageSplitter splitter = new MessageSplitter(messageList);
while (splitter.hasNext()) {
List<Message> nextList = splitter.next();
SendResult result = rocketMQTemplate.syncSend("batch_topic", nextList);
if (result.getSendStatus() == SendStatus.SEND_OK) {
log.info("发送批量消息成功!消息ID为:{}", result.getMsgId());
} else {
log.info("发送批量消息失败!消息ID为:{},消息状态为:{}", result.getMsgId(), result.getSendStatus());
}
}
log.info("已发送...");
}
}

View File

@ -0,0 +1,32 @@
package com.youlai.lab.rocketmq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @author zc
* @date 2022-10-27 00:58
*/
@Service
@Slf4j
public class ExProducer {
/**
* 测试发送将参数topic定死实际开发写入到配置文件
*/
@Resource
RocketMQTemplate rocketMQTemplate;
public void ex() {
String text = "消息额外属性测试" + System.currentTimeMillis();
log.info(text);
rocketMQTemplate.syncSend("ex_topic", text);
log.info("已发送...");
}
}

View File

@ -0,0 +1,62 @@
package com.youlai.lab.rocketmq.producer;
import org.springframework.messaging.Message;
import java.util.Iterator;
import java.util.List;
/**
* @author zc
* @date 2022-10-27 00:59
*/
public class MessageSplitter implements Iterator<List<Message>> {
/**
* 分割数据大小
*/
private final int sizeLimit = 1024 * 1024;
;
/**
* 分割数据列表
*/
private final List<Message> messages;
/**
* 分割索引
*/
private int currIndex;
public MessageSplitter(List<Message> messages) {
this.messages = messages;
// 保证单条数据的大小不大于sizeLimit
messages.forEach(m -> {
if (m.toString().length() > sizeLimit) {
throw new RuntimeException("单挑消息不能大于" + sizeLimit + "B");
}
});
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message t = messages.get(nextIndex);
totalSize = totalSize + t.toString().length();
if (totalSize > sizeLimit) {
break;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}

View File

@ -0,0 +1,40 @@
package com.youlai.lab.rocketmq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* @author zc
* @date 2022-10-27 00:59
*/
@Service
@Slf4j
public class OrderProducer {
/**
* 测试发送将参数topic定死实际开发写入到配置文件
*/
@Resource
RocketMQTemplate rocketMQTemplate;
public void order() {
log.info("顺序消息");
try {
for (int i = 1; i <= 10; i++) {
int num = (int) (Math.random() * 10000);
// 设置一个延时表示同一个消息先后进入到队形中
TimeUnit.MILLISECONDS.sleep(50);
log.info("顺序消息,ID:" + num);
// 第一个参数为topic第二个参数为内容第三个参数为Hash值不同hash值在不同的队列中
rocketMQTemplate.syncSendOrderly("order_topic", "顺序消息,ID:" + num, "order");
}
log.info("已发送...");
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,33 @@
package com.youlai.lab.rocketmq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @author zc
* @date 2022-10-27 00:59
*/
@Service
@Slf4j
public class ReplyProducer {
/**
* 测试发送将参数topic定死实际开发写入到配置文件
*/
@Resource
RocketMQTemplate rocketMQTemplate;
public void reply() {
// 如果消费者没有回馈消息则不会发送下一条消息
for (int i = 1; i <= 10; i++) {
String text = "回馈消息" + "--" + i;
log.info("发送" + text);
Object obj = rocketMQTemplate.sendAndReceive("reply_topic", text, String.class);
log.info("消费者返回的消息:" + obj);
}
}
}

View File

@ -0,0 +1,44 @@
package com.youlai.lab.rocketmq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
* @author zc
* @date 2022-10-27 01:00
*/
@Service
@Slf4j
public class SQLProducer {
/**
* 测试发送将参数topic定死实际开发写入到配置文件
*/
@Resource
RocketMQTemplate rocketMQTemplate;
/**
* SQL92过滤消息
*/
public void selector() {
String text = "SQL92过滤消息" + System.currentTimeMillis();
log.info(text);
Message<String> message = MessageBuilder.withPayload(text).build();
// 设置参数
Map<String, Object> map = new HashMap<>(4);
map.put("a", 2);
map.put("b", 10);
rocketMQTemplate.convertAndSend("sql_topic", message, map);
log.info("已发送...");
}
}

View File

@ -0,0 +1,35 @@
package com.youlai.lab.rocketmq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @author zc
* @date 2022-10-27 01:00
*/
@Service
@Slf4j
public class ScheduledProducer {
/**
* 测试发送将参数topic定死实际开发写入到配置文件
*/
@Resource
RocketMQTemplate rocketMQTemplate;
public void scheduled() {
String text = "延时消息"+ System.currentTimeMillis();
log.info(text);
// 设置延时等级2,这个消息将在5s之后发送
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Message<String> message = MessageBuilder.withPayload(text).build();
rocketMQTemplate.syncSend("scheduled_topic", message, 1000, 2);
log.info("已发送...");
}
}

View File

@ -0,0 +1,33 @@
package com.youlai.lab.rocketmq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @author zc
* @date 2022-10-27 01:00
*/
@Service
@Slf4j
public class TagProducer {
/**
* 测试发送将参数topic定死实际开发写入到配置文件
*/
@Resource
RocketMQTemplate rocketMQTemplate;
public void tag() {
String text = "标签过滤消息" + System.currentTimeMillis();
log.info(text);
// 任何类型的send方法均可以指定TAG默认可以不指定则为*
Message<String> message = MessageBuilder.withPayload(text).build();
rocketMQTemplate.syncSend("tag_topic:TAG-A", message);
log.info("已发送...");
}
}

View File

@ -0,0 +1,32 @@
package com.youlai.lab.rocketmq.producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* @author zc
* @date 2022-10-27 01:00
*/
@Service
@Slf4j
public class TestProducer {
/**
* 测试发送将参数topic定死实际开发写入到配置文件
*/
@Resource
private RocketMQTemplate rocketMQTemplate;
public void send() {
String text = "测试发送";
Message<String> message = MessageBuilder.withPayload(text).build();
log.info("开始发送...");
rocketMQTemplate.send("test_topic", message);
log.info("已发送...");
}
}

View File

@ -0,0 +1,41 @@
package com.youlai.lab.rocketmq.producer.tx;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.UUID;
/**
* @author zc
* @date 2022-10-27 00:56
*/
@Service
@Slf4j
public class TxProducer {
/**
* 测试发送将参数topic定死实际开发写入到配置文件
* 一个RocketMQTemplate只能注册一个事务监听器
* 如果存在多个事务监听器监听不同的`Producer`
* 需要通过注解`@ExtRocketMQTemplateConfiguration`定义不同的RocketMQTemplate
*/
@Resource(name = "txRocketMQTemplate")
RocketMQTemplate rocketMQTemplate;
public void tx() {
String text = "消息事务发送" + System.currentTimeMillis();
log.info(text);
UUID transactionId = UUID.randomUUID();
log.info("事务ID" + transactionId);
Message<String> message = MessageBuilder.withPayload(text)
// 设置事务Id
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.build();
rocketMQTemplate.sendMessageInTransaction("tx_topic", message, null);
log.info("已发送...");
}
}

View File

@ -0,0 +1,111 @@
package com.youlai.lab.rocketmq.producer.tx;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author zc
* @date 2022-10-27 00:56
*/
@Slf4j
@RocketMQTransactionListener(rocketMQTemplateBeanName = "txRocketMQTemplate")
public class TxProducerListener implements RocketMQLocalTransactionListener {
/**
* 记录各个事务Id的状态:1-正在执行2-执行成功3-执行失败
*/
private ConcurrentHashMap<String, Integer> transMap = new ConcurrentHashMap<>();
/**
* 执行本地事务
*
* @param msg
* @param arg
* @return
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
String transId = msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID).toString();
log.info("消息事务id为:" + transId);
// 状态为正在执行
transMap.put(transId, 1);
try {
log.info("正在执行本地事务");
// 模拟耗时操作估计出发mq回查操作当RocketMQ长时间(1分钟)没有收到本地事务的返回结果
// TimeUnit.SECONDS.sleep(80);
// 模拟业代码执行,比如模拟插入user数据到数据库中并且失败的情况
System.out.println(1 / 0);
log.info("事务执行完成.");
} catch (Exception e) {
// 状态为执行失败
transMap.put(transId, 3);
log.error("事务执行异常.");
// 出现异常
// 如果不需要重试 则设置为ROLLBACK
// 如果需要检查事务重试,1分钟后发起检查 则设置为UNKNOWN
return RocketMQLocalTransactionState.UNKNOWN;
}
// 状态为执行成功
transMap.put(transId, 2);
return RocketMQLocalTransactionState.COMMIT;
}
/**
* 事务超时回查方法
* 检查本地事务,如果RocketMQ长时间(1分钟左右)没有收到本地事务的返回结果则会定时主动执行改方法查询本地事务执行情况
*
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
//根据transaction的id回查该事务的状态,并返回给消息队列
//未知状态:查询事务状态,但始终无结果,或者由于网络原因发送不成功,对mq来说都是未知状态,LocalTransactionState.UNKNOW
//正确提交返回LocalTransactionState.COMMIT_MESSAGE
//事务执行失败返回LocalTransactionState.ROLLBACK_MESSAGE
String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
Integer status = transMap.get(transId);
// 执行状态 1-正在执行2-执行成功3-执行失败
log.info("回查的事务id为:" + transId + ",当前的状态为:" + status);
//正在执行
if (status == 1) {
log.info("回查结果为:正在执行状态");
return RocketMQLocalTransactionState.UNKNOWN;
} else if (status == 2) {
//执行成功,返回commit
log.info("回查结果为:成功状态");
return RocketMQLocalTransactionState.COMMIT;
} else if (status == 3) {
//执行失败,返回rollback
log.info("回查结果为:失败状态");
// 通过伪代码表示 检查本地事务执行情况
// User user = selectByUserId(userId);
// if (user!=null) {
// //插入成功(本地事务完成)
// return RocketMQLocalTransactionState.COMMIT;
// } else {
// // 插入失败
// // 如果不需要再重试 则设置为ROLLBACK
// // 如果还需要检查事务重试 则设置为UNKNOWN
// return RocketMQLocalTransactionState.UNKNOWN;
// }
return RocketMQLocalTransactionState.ROLLBACK;
}
// 其他未知情况统一返回不重试删除消息
return RocketMQLocalTransactionState.ROLLBACK;
}
}

View File

@ -0,0 +1,11 @@
package com.youlai.lab.rocketmq;
import org.apache.rocketmq.common.message.Message;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
public class TestProducer {
}