CAS+失败重试方式实现数据库的原子性更新

>>强大,10k+点赞的 SpringBoot 后台管理系统竟然出了详细教程!

点击关注公众号,利用碎片时间学习

在数据库修改单条数据时,常用的方式是select for update的悲观锁机制,如果锁竞争比较大,没有获得锁的操作会阻塞。使用CAS乐观锁的方式,可以大大提高并发性。例如,在分布式服务中,多个用户并发下单操作前会先扣减库存时,网上盗图,服务1,服务2和服务3为不同机器上的库存服务。

库存扣减操作流程如下:

CAS+失败重试方式实现数据库的原子性更新

使用cas方式的乐观锁,当库存还剩3个,3个用户同时下单,服务同时扣减库存,可以并发地扣减成功,提高了并发性。如果库存还剩1个,3个用户同时下单,同时扣减库存,这时只有1个用户会操作成功,其余2个会失败,避免了超卖。

在库存的CAS操作中,先进行库存查询操作,然后根据value+version方式修改库存,如果操作失败,则幂等地重复操作。通常会从操作次数和执行时间两个条件限制CAS的操作,如果两个条件中有某个条件触发,可以抛出乐观锁异常。

在Java项目中,通过AOP+注解的方式实现数据库操作的CAS幂等操作的统一处理。定义OptimisticRetry注解,标识CAS重试Spring AOP的切点,并且设置属性value(最大重试次数条件),以及属性maxExecuteTime(最大执行时间条件)。OptimisticRetryAOP定义CAS重试的切面,乐观锁的实现。OptimisticRetry注解定义代码如下:

/**
 * 乐观锁的重试
 **/

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface OptimisticRetry {
 
    /**
     * 最大重试次数
     */

    int value() default 200;
 
    /**
     * 最大执行时间
     */

    int maxExecuteTime() default 2 * 60 * 60 * 1000;
}

OptimisticRetryAOP切面,定义标注了OptimisticRetry注解的方法则进行,CAS幂等重试,如果达到最大重试次数限制或者最大执行时间限制,则抛出乐观锁异常OptimisticLockingFailureException。

代码如下:

/**
 * 乐观锁的重试
 **/

@Aspect
@Component
@Order(10000)
@Slf4j
public class OptimisticRetryAOP {
 
    @Pointcut("@annotation(com.smcx.winemall.common.lock.OptimisticRetry)|| @within(com.smcx.winemall.common.lock.OptimisticRetry)")
    public void optimisticRetryPointcut() {
 
    }
 
    @Around("optimisticRetryPointcut()")
    public Object doConcurrentOperation(ProceedingJoinPoint joinPoint) throws Throwable {
 
        Signature signature = joinPoint.getSignature();
        MethodSignature methodSignature = (MethodSignature) signature;
        // 代理目标对象Class
        Class targetClazz = joinPoint.getTarget().getClass();
        Method method = targetClazz.getMethod(methodSignature.getName(), methodSignature.getParameterTypes());
        Optional<OptimisticRetry> optimisticRetryOption = getOptimisticRetry(method.getAnnotations());
        if (!optimisticRetryOption.isPresent()) {
 
            log.warn("no OptimisticRetry Annotation to execute!");
            return joinPoint.proceed();
        }
        OptimisticRetryConfig optimisticRetryConfig = obtainOptimisticRetryConfig(optimisticRetryOption.get());
        int numAttempts = 0;
        OptimisticLockingFailureException lockFailureException;
        long startTime = System.currentTimeMillis();
        do {
            numAttempts++;
            try {
                return joinPoint.proceed();
            } catch (OptimisticLockingFailureException ex) {
 
                lockFailureException = ex;
                long executeTime = System.currentTimeMillis() - startTime;
                long maxExecuteTime = optimisticRetryConfig.getMaxExecuteTime();
                if (isLargerThanMaxExecuteTime(executeTime, maxExecuteTime)) {
 
                    log.warn("throw optimistic locking failure exception!num attempts [{}],start time [{}]," +
                                    "actual execute time [{}] ms, max execute time [{}] ms",
                            numAttempts, startTime, executeTime, maxExecuteTime);
                    throw lockFailureException;
                }
            }
        }
        while (numAttempts <= optimisticRetryConfig.getMaxTryCount());
 
        log.warn("throw optimistic locking failure exception!num attempts {} ", numAttempts);
        throw lockFailureException;
    }
 
    /**
     * 大于限制的重试执行时间
     *
     * @param executeTime
     * @return
     */

    private boolean isLargerThanMaxExecuteTime(long executeTime, long maxExecuteTime) {
 
        if (executeTime <= 0) {
            return false;
        }
        if (maxExecuteTime > executeTime) {
            return true;
        }
        return false;
    }
 
    private OptimisticRetryConfig obtainOptimisticRetryConfig(OptimisticRetry optimisticRetry) {
 
        // 从注解中获取配的值
        OptimisticRetryConfig optimisticRetryConfig = new OptimisticRetryConfig();
        optimisticRetryConfig.setMaxTryCount(optimisticRetry.value());
        optimisticRetryConfig.setMaxExecuteTime(optimisticRetry.maxExecuteTime());
        return optimisticRetryConfig;
    }
 
    private Optional<OptimisticRetry> getOptimisticRetry(Annotation[] annotations) {
 
        if (ArrayUtils.isEmpty(annotations)) {
            return Optional.empty();
        }
        for (Annotation anno : annotations) {
            if (anno.annotationType().getName().equals(OptimisticRetry.class.getName())) {
                return Optional.of((OptimisticRetry) anno);
            }
        }
        return Optional.empty();
    }
 
    /**
     * 重试配置
     */

    @Data
    private static class OptimisticRetryConfig implements Serializable {
 
        private static final long serialVersionUID = -182211651320526367L;
 
        /**
         * 最大重试次数
         */

        private int maxTryCount;
 
        /**
         * 最大执行时间
         */

        private long maxExecuteTime;
    }
}

业务代码中操作,代码如下:

    @Override
    @Transactional(rollbackFor = Exception.class)
    @OptimisticRetry
    public void decreaseStockRemainingAmount(OperateStockDto operateStock
{
 
        // 获取存储
        WinemallStock existStock = winemallStockMapper.selectByOperateStock(operateStock);
        // TODO 库存是否充足判断等条件判断
 
        // 修改库存数据
        if (0 == winemallStockMapper.updateStockRemainingAmount(existStock, 1)) {
            throw new OptimisticLockingFailureException("decrease stock remaining amount optimistic locking failure!");
        }
    }

来源:blog.csdn.net/new_com/article/details/103834871

推荐:

主流Java进阶技术(学习资料分享)

CAS+失败重试方式实现数据库的原子性更新
PS:因为公众号平台更改了推送规则,如果不想错过内容,记得读完点一下“在看”,加个“星标”,这样每次新文章推送才会第一时间出现在你的订阅列表里。“在看”支持我们吧!

原文始发于微信公众号(Java笔记虾):CAS+失败重试方式实现数据库的原子性更新