feat: Seata TCC模式改造

1、Seata TCC 订单功能添加
2、冻结库存TCC模式添加
This commit is contained in:
99424 2021-07-16 10:50:16 +08:00
parent 349705027c
commit f6e5e6bd07
10 changed files with 402 additions and 14 deletions

View File

@ -29,6 +29,11 @@ public interface IOrderService extends IService<OmsOrder> {
*/
OrderSubmitVO submit(OrderSubmitDTO orderSubmitDTO) ;
/**
* 订单提交
*/
OrderSubmitVO submitTcc(OrderSubmitDTO orderSubmitDTO) ;
/**
* 订单支付
*/

View File

@ -15,17 +15,18 @@ import com.youlai.mall.oms.enums.OrderStatusEnum;
import com.youlai.mall.oms.enums.OrderTypeEnum;
import com.youlai.mall.oms.enums.PayTypeEnum;
import com.youlai.mall.oms.mapper.OrderMapper;
import com.youlai.mall.oms.pojo.entity.OmsOrder;
import com.youlai.mall.oms.pojo.entity.OmsOrderItem;
import com.youlai.mall.oms.pojo.dto.OrderConfirmDTO;
import com.youlai.mall.oms.pojo.dto.OrderItemDTO;
import com.youlai.mall.oms.pojo.dto.OrderSubmitDTO;
import com.youlai.mall.oms.pojo.entity.OmsOrder;
import com.youlai.mall.oms.pojo.entity.OmsOrderItem;
import com.youlai.mall.oms.pojo.vo.CartVO;
import com.youlai.mall.oms.pojo.vo.OrderConfirmVO;
import com.youlai.mall.oms.pojo.vo.OrderSubmitVO;
import com.youlai.mall.oms.service.ICartService;
import com.youlai.mall.oms.service.IOrderItemService;
import com.youlai.mall.oms.service.IOrderService;
import com.youlai.mall.oms.tcc.service.SeataTccOrderService;
import com.youlai.mall.pms.api.SkuFeignClient;
import com.youlai.mall.pms.pojo.dto.SkuDTO;
import com.youlai.mall.pms.pojo.dto.SkuLockDTO;
@ -56,16 +57,17 @@ import static com.youlai.mall.oms.constant.OmsConstants.*;
@Service
public class OrderServiceImpl extends ServiceImpl<OrderMapper, OmsOrder> implements IOrderService {
private ICartService cartService;
private SkuFeignClient skuFeignService;
private MemberAddressFeignClient addressFeignService;
private IOrderItemService orderItemService;
private RabbitTemplate rabbitTemplate;
private StringRedisTemplate redisTemplate;
private ThreadPoolExecutor threadPoolExecutor;
private MemberFeignClient memberFeignClient;
private final ICartService cartService;
private final SkuFeignClient skuFeignService;
private final MemberAddressFeignClient addressFeignService;
private final IOrderItemService orderItemService;
private final RabbitTemplate rabbitTemplate;
private final StringRedisTemplate redisTemplate;
private final ThreadPoolExecutor threadPoolExecutor;
private final MemberFeignClient memberFeignClient;
private BusinessNoGenerator businessNoGenerator;
private final BusinessNoGenerator businessNoGenerator;
private final SeataTccOrderService seataTccOrderService;
/**
* 订单确认
@ -175,7 +177,6 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, OmsOrder> impleme
if (!Result.success().getCode().equals(lockResult.getCode())) {
throw new BizException(Result.failed().getMsg());
}
// 创建订单(状态待支付)
OmsOrder order = new OmsOrder();
order.setOrderSn(orderToken) // 把orderToken赋值给订单编号!
@ -213,6 +214,63 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, OmsOrder> impleme
return submitVO;
}
@Override
@GlobalTransactional(rollbackFor = Exception.class)
public OrderSubmitVO submitTcc(OrderSubmitDTO submitDTO) {
log.info("=======================订单提交=======================\n订单提交信息{}", submitDTO);
// 订单重复提交校验
String orderToken = submitDTO.getOrderToken();
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(RELEASE_LOCK_LUA_SCRIPT, Long.class);
Long result = this.redisTemplate.execute(redisScript, Collections.singletonList(ORDER_TOKEN_PREFIX + orderToken), orderToken);
if (!ObjectUtil.equals(result, RELEASE_LOCK_SUCCESS_RESULT)) {
throw new BizException("订单不可重复提交");
}
List<OrderItemDTO> orderItems = submitDTO.getOrderItems();
if (CollectionUtil.isEmpty(orderItems)) {
throw new BizException("订单没有商品,请选择商品后提交");
}
// 订单验价
Long currentTotalPrice = orderItems.stream().map(item -> {
SkuDTO sku = skuFeignService.getSkuById(item.getSkuId()).getData();
if (sku != null) {
return sku.getPrice() * item.getCount();
}
return 0l;
}).reduce(0l, Long::sum);
if (currentTotalPrice.compareTo(submitDTO.getTotalPrice()) != 0) {
throw new BizException("页面已过期,请重新刷新页面再提交");
}
// 校验库存是否足够和锁库存
List<SkuLockDTO> skuLockList = orderItems.stream()
.map(item -> SkuLockDTO.builder().skuId(item.getSkuId())
.count(item.getCount())
.orderToken(orderToken)
.build())
.collect(Collectors.toList());
Result lockResult = skuFeignService.lockStock(skuLockList);
if (!Result.success().getCode().equals(lockResult.getCode())) {
throw new BizException(Result.failed().getMsg());
}
// TCC模式创建订单(状态待支付)
OmsOrder order = seataTccOrderService.prepareSubmitOrder(null, submitDTO);
// 将订单放入延时队列超时未支付由交换机order.exchange切换到死信队列完成系统自动关单
log.info("订单超时取消RabbitMQ消息发送订单SN{}", orderToken);
rabbitTemplate.convertAndSend("order.exchange", "order.create", orderToken);
OrderSubmitVO submitVO = new OrderSubmitVO();
submitVO.setOrderId(order.getId());
submitVO.setOrderSn(order.getOrderSn());
log.info("订单提交响应:{}", submitVO.toString());
return submitVO;
}
/**
* 订单支付
@ -310,5 +368,4 @@ public class OrderServiceImpl extends ServiceImpl<OrderMapper, OmsOrder> impleme
page.setRecords(list);
return page;
}
}

View File

@ -0,0 +1,26 @@
package com.youlai.mall.oms.tcc.idempotent;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
/**
* @Author DaniR
* @Description TCC幂等工具类
* @Date 2021/7/15 20:38
**/
public class IdempotentUtil {
private static Table<Class<?>,String,Long> map= HashBasedTable.create();
public static void addMarker(Class<?> clazz,String xid,Long marker){
map.put(clazz,xid,marker);
}
public static Long getMarker(Class<?> clazz,String xid){
return map.get(clazz,xid);
}
public static void removeMarker(Class<?> clazz,String xid){
map.remove(clazz,xid);
}
}

View File

@ -0,0 +1,20 @@
package com.youlai.mall.oms.tcc.service;
import com.youlai.mall.oms.pojo.dto.OrderSubmitDTO;
import com.youlai.mall.oms.pojo.entity.OmsOrder;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
@LocalTCC
public interface SeataTccOrderService {
@TwoPhaseBusinessAction(name = "prepareSubmitOrder", commitMethod = "commitSubmitOrder", rollbackMethod = "rollbackSubmitOrder")
OmsOrder prepareSubmitOrder(BusinessActionContext businessActionContext,
@BusinessActionContextParameter(paramName = "orderSubmitDTO") OrderSubmitDTO orderSubmitDTO);
boolean commitSubmitOrder(BusinessActionContext businessActionContext);
boolean rollbackSubmitOrder(BusinessActionContext businessActionContext);
}

View File

@ -0,0 +1,104 @@
package com.youlai.mall.oms.tcc.service.impl;
import cn.hutool.core.bean.BeanUtil;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.youlai.common.web.util.JwtUtils;
import com.youlai.mall.oms.enums.OrderStatusEnum;
import com.youlai.mall.oms.enums.OrderTypeEnum;
import com.youlai.mall.oms.pojo.dto.OrderItemDTO;
import com.youlai.mall.oms.pojo.dto.OrderSubmitDTO;
import com.youlai.mall.oms.pojo.entity.OmsOrder;
import com.youlai.mall.oms.pojo.entity.OmsOrderItem;
import com.youlai.mall.oms.service.IOrderItemService;
import com.youlai.mall.oms.service.IOrderService;
import com.youlai.mall.oms.tcc.idempotent.IdempotentUtil;
import com.youlai.mall.oms.tcc.service.SeataTccOrderService;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.LocalTCC;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@Slf4j
@LocalTCC
public class SeataTccOrderServiceImpl implements SeataTccOrderService {
@Autowired
private IOrderService orderService;
@Autowired
private IOrderItemService orderItemService;
@Transactional
@Override
public OmsOrder prepareSubmitOrder(BusinessActionContext businessActionContext, OrderSubmitDTO orderSubmitDTO) {
log.info("==========创建 订单 第一阶段事务组Xid:{} ==========", businessActionContext.getXid());
List<OrderItemDTO> orderItems = orderSubmitDTO.getOrderItems();
String orderToken = orderSubmitDTO.getOrderToken();
// 创建订单(状态待支付)
OmsOrder order = new OmsOrder();
order.setOrderSn(orderToken)
.setStatus(OrderStatusEnum.PENDING_PAYMENT.getCode())
.setSourceType(OrderTypeEnum.APP.getCode())
.setMemberId(JwtUtils.getUserId())
.setRemark(orderSubmitDTO.getRemark())
.setPayAmount(orderSubmitDTO.getPayAmount())
.setTotalQuantity(orderItems.stream().map(item -> item.getCount()).reduce(0, (x, y) -> x + y))
.setTotalAmount(orderItems.stream().map(item -> item.getPrice() * item.getCount()).reduce(0l, (x, y) -> x + y))
.setGmtCreate(new Date());
orderService.save(order);
// 创建订单商品
List<OmsOrderItem> orderItemList = orderItems.stream().map(item -> OmsOrderItem.builder()
.orderId(order.getId())
.skuId(item.getSkuId())
.skuName(item.getSkuName())
.skuPrice(item.getPrice())
.skuPic(item.getPic())
.skuQuantity(item.getCount())
.skuTotalPrice(item.getCount() * item.getPrice())
.skuCode(item.getSkuCode())
.build()).collect(Collectors.toList());
orderItemService.saveBatch(orderItemList);
log.info("保存订单:{} 成功", order.getOrderSn());
IdempotentUtil.addMarker(getClass(), businessActionContext.getXid(), System.currentTimeMillis());
return order;
}
@Override
@Transactional
public boolean commitSubmitOrder(BusinessActionContext businessActionContext) {
log.info("==========已经完成Confirm-->创建 订单 第二阶段提交,事务组Xid:{} ==========", businessActionContext.getXid());
if (Objects.isNull(IdempotentUtil.getMarker(getClass(), businessActionContext.getXid()))) {
log.info("已执行过Confirm阶段");
return true;
}
IdempotentUtil.removeMarker(getClass(), businessActionContext.getXid());
return true;
}
@Override
@Transactional
public boolean rollbackSubmitOrder(BusinessActionContext businessActionContext) {
log.info("======================rollbackSubmitOrder========================");
if (Objects.isNull(IdempotentUtil.getMarker(getClass(), businessActionContext.getXid()))) {
log.info("已执行过rollback阶段");
return true;
}
JSONObject jsonObject = (JSONObject) businessActionContext.getActionContext("orderSubmitDTO");
OrderSubmitDTO orderSubmitDTO = new OrderSubmitDTO();
BeanUtil.copyProperties(jsonObject, orderSubmitDTO);
OmsOrder omsOrder = orderService.getOne(new LambdaQueryWrapper<OmsOrder>().eq(OmsOrder::getOrderSn, orderSubmitDTO.getOrderToken()));
if (Objects.nonNull(omsOrder)) {
orderItemService.remove(new LambdaQueryWrapper<OmsOrderItem>().eq(OmsOrderItem::getOrderId, omsOrder.getId()));
orderService.deleteOrder(omsOrder.getId());
}
IdempotentUtil.removeMarker(getClass(), businessActionContext.getXid());
log.info("========== 已经完成Cancle-->第二阶段回滚,事务组Xid:{} ==========" + businessActionContext.getXid());
return true;
}
}

View File

@ -14,6 +14,11 @@ public interface IPmsSkuService extends IService<PmsSku> {
*/
boolean lockStock(List<SkuLockDTO> list);
/**
* 锁定库存
*/
boolean lockStockTcc(List<SkuLockDTO> list);
/**
* 解锁库存
*/

View File

@ -13,6 +13,8 @@ import com.youlai.mall.pms.pojo.entity.PmsSku;
import com.youlai.mall.pms.pojo.dto.SkuDTO;
import com.youlai.mall.pms.pojo.dto.SkuLockDTO;
import com.youlai.mall.pms.service.IPmsSkuService;
import com.youlai.mall.pms.tcc.service.SeataTccSkuService;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
@ -35,9 +37,21 @@ public class PmsSkuServiceImpl extends ServiceImpl<PmsSkuMapper, PmsSku> impleme
private RedissonClient redissonClient;
private SeataTccSkuService seataTccSkuService;
@GlobalTransactional
@Override
public boolean lockStockTcc(List<SkuLockDTO> skuLockList) {
seataTccSkuService.prepareSkuLockList(null,skuLockList);
String orderToken = skuLockList.get(0).getOrderToken();
redisTemplate.opsForValue().set(LOCKED_STOCK_PREFIX + orderToken, JSONUtil.toJsonStr(skuLockList));
return true;
}
/**
* 创建订单时锁定库存
*/
@GlobalTransactional
@Override
public boolean lockStock(List<SkuLockDTO> skuLockList) {
log.info("=======================创建订单,开始锁定商品库存=======================");
@ -45,7 +59,7 @@ public class PmsSkuServiceImpl extends ServiceImpl<PmsSkuMapper, PmsSku> impleme
if (CollectionUtil.isEmpty(skuLockList)) {
throw new BizException("锁定的商品列表为空");
}
//prepareSkuLockList(null, skuLockList);
// 锁定商品
skuLockList.forEach(item -> {
RLock lock = redissonClient.getLock(LOCK_SKU_PREFIX + item.getSkuId()); // 获取商品的分布式锁
@ -84,6 +98,7 @@ public class PmsSkuServiceImpl extends ServiceImpl<PmsSkuMapper, PmsSku> impleme
return true;
}
/**
* 订单超时关单解锁库存
*/

View File

@ -0,0 +1,25 @@
package com.youlai.mall.pms.tcc.idempotent;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Table;
/**
* @Author DaniR
* @Description TCC幂等工具类
* @Date 2021/7/15 20:38
**/
public class IdempotentUtil {
private static Table<Class<?>,String,Long> map= HashBasedTable.create();
public static void addMarker(Class<?> clazz,String xid,Long marker){
map.put(clazz,xid,marker);
}
public static Long getMarker(Class<?> clazz,String xid){
return map.get(clazz,xid);
}
public static void removeMarker(Class<?> clazz,String xid){
map.remove(clazz,xid);
}
}

View File

@ -0,0 +1,21 @@
package com.youlai.mall.pms.tcc.service;
import com.youlai.mall.pms.pojo.dto.SkuLockDTO;
import io.seata.rm.tcc.api.BusinessActionContext;
import io.seata.rm.tcc.api.BusinessActionContextParameter;
import io.seata.rm.tcc.api.LocalTCC;
import io.seata.rm.tcc.api.TwoPhaseBusinessAction;
import java.util.List;
@LocalTCC
public interface SeataTccSkuService {
@TwoPhaseBusinessAction(name = "prepareSkuLockList", commitMethod = "commitSkuLockList", rollbackMethod = "rollbackSkuLockList")
boolean prepareSkuLockList(BusinessActionContext businessActionContext,
@BusinessActionContextParameter(paramName = "skuLockList") List<SkuLockDTO> skuLockList);
boolean commitSkuLockList(BusinessActionContext businessActionContext);
boolean rollbackSkuLockList(BusinessActionContext businessActionContext);
}

View File

@ -0,0 +1,110 @@
package com.youlai.mall.pms.tcc.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.youlai.common.web.exception.BizException;
import com.youlai.mall.pms.pojo.dto.SkuLockDTO;
import com.youlai.mall.pms.pojo.entity.PmsSku;
import com.youlai.mall.pms.service.IPmsSkuService;
import com.youlai.mall.pms.tcc.idempotent.IdempotentUtil;
import com.youlai.mall.pms.tcc.service.SeataTccSkuService;
import io.seata.rm.tcc.api.BusinessActionContext;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import static com.youlai.mall.pms.common.constant.PmsConstants.LOCK_SKU_PREFIX;
@Slf4j
@Component
public class SeataTccSkuServiceImpl implements SeataTccSkuService {
@Autowired
private IPmsSkuService iPmsSkuService;
@Autowired
private RedissonClient redissonClient;
@Transactional
@Override
public boolean prepareSkuLockList(BusinessActionContext businessActionContext, List<SkuLockDTO> skuLockList) {
log.info("=======================创建订单,开始锁定商品库存=======================");
//防幂等
if (Objects.nonNull(IdempotentUtil.getMarker(getClass(), businessActionContext.getXid()))) {
log.info("已执行过try阶段");
return true;
}
log.info("锁定商品信息:{}", skuLockList.toString());
if (CollectionUtil.isEmpty(skuLockList)) {
throw new BizException("锁定的商品列表为空");
}
// 锁定商品
skuLockList.forEach(item -> {
RLock lock = redissonClient.getLock(LOCK_SKU_PREFIX + item.getSkuId()); // 获取商品的分布式锁
lock.lock();
boolean result = iPmsSkuService.update(new LambdaUpdateWrapper<PmsSku>()
.setSql("locked_stock = locked_stock + " + item.getCount())
.eq(PmsSku::getId, item.getSkuId())
.apply("stock - locked_stock >= {0}", item.getCount())
);
item.setLocked(result);
lock.unlock();
});
// 锁定失败的商品集合
List<SkuLockDTO> unlockSkuList = skuLockList.stream().filter(item -> !item.getLocked()).collect(Collectors.toList());
if (CollectionUtil.isNotEmpty(unlockSkuList)) {
// 恢复已被锁定的库存
List<SkuLockDTO> lockSkuList = skuLockList.stream().filter(SkuLockDTO::getLocked).collect(Collectors.toList());
lockSkuList.forEach(item ->
iPmsSkuService.update(new LambdaUpdateWrapper<PmsSku>()
.eq(PmsSku::getId, item.getSkuId())
.setSql("locked_stock = locked_stock - " + item.getCount()))
);
// 提示订单哪些商品库存不足
List<Long> ids = unlockSkuList.stream().map(SkuLockDTO::getSkuId).collect(Collectors.toList());
throw new BizException("商品" + ids.toString() + "库存不足");
}
IdempotentUtil.addMarker(getClass(), businessActionContext.getXid(), System.currentTimeMillis());
return true;
}
@Transactional
@Override
public boolean commitSkuLockList(BusinessActionContext businessActionContext) {
log.info("=====================commitSkuLockList 成功=========================");
if (Objects.isNull(IdempotentUtil.getMarker(getClass(), businessActionContext.getXid()))) {
log.info(businessActionContext.getXid() + ": 已执行过commit阶段");
return true;
}
IdempotentUtil.removeMarker(getClass(), businessActionContext.getXid());
return true;
}
@Transactional
@Override
public boolean rollbackSkuLockList(BusinessActionContext businessActionContext) {
log.info("======================rollbackSkuLockList========================");
if (Objects.isNull(IdempotentUtil.getMarker(getClass(), businessActionContext.getXid()))) {
log.info(businessActionContext.getXid() + ": 已执行过rollback阶段");
return true;
}
JSONArray jsonObjectList = (JSONArray) businessActionContext.getActionContext("skuLockList");
List<SkuLockDTO> skuLockList = JSONObject.parseArray(jsonObjectList.toJSONString(), SkuLockDTO.class);
skuLockList.forEach(item ->
iPmsSkuService.update(new LambdaUpdateWrapper<PmsSku>()
.eq(PmsSku::getId, item.getSkuId())
.setSql("locked_stock = locked_stock - " + item.getCount()))
);
IdempotentUtil.removeMarker(getClass(), businessActionContext.getXid());
return true;
}
}