springboot基于注解实现去重表消息防止重复消费

 更新时间:2025年05月16日 10:20:22   作者:sjsjsbbsbsn  
本文主要介绍了springboot基于注解实现去重表消息防止重复消费,通过记录消息ID、使用分布式锁和设置过期时间,可以确保消息只会被处理一次,具有一定的参考价值,感兴趣的可以了解一下

1. 背景/问题

在分布式系统中,消息队列(如RocketMQ、Kafka)的 消息重复消费 是常见问题,主要原因包括:

  • 网络抖动:生产者或消费者因网络不稳定触发消息重发。
  • 消费者超时:消费者处理时间过长,消息队列误判为失败并重新投递。
  • 集群故障转移:消费者宕机后,未完成的消息会被其他节点重新拉取。

重复消费带来的问题

  • 业务逻辑多次执行(如重复扣款、重复生成订单)。
  • 数据一致性被破坏(如库存超卖、积分累加错误)。
  • 系统资源浪费,影响性能和稳定性。

为了避免这种情况发生,需要在客户端实现一些机制来确保消息不会被重复消费,例如记录消费者已经处理的消息 ID、使用分布式锁来控制消费进程的唯一性等。这些机制能够保证消息被成功处理,同时也能够提高系统的可靠性和稳定性。

2. 什么是幂等性

幂等性 是指对同一操作的多次执行所产生的影响与一次执行的影响相同。

  • 消息消费场景:无论消息被消费多少次,最终结果应与消费一次一致。
  • 实现目标:通过幂等设计,确保业务逻辑的重复执行不会产生副作用。

3. 幂等设计

核心思路

  • 幂等标识:为每条消息生成唯一标识(如业务ID + 消息ID),记录其处理状态。
  • 状态管理:通过数据库或Redis维护幂等标识的状态(如“消费中”“已消费”)。
  • 过期时间:防止因系统崩溃导致状态长期滞留,需设置合理的超时时间(如10分钟)。
[消费者接收消息]  
        │  
        ▼  
[解析消息,生成唯一幂等标识]  
        │  
        ▼  
[查询幂等标识状态]  
        │  
┌───────┴───────┐  
│ 存在且已消费  │           [返回成功,丢弃消息]  
└───────┬───────┘  
        │  
┌───────┴───────┐  
│ 存在且消费中  │           [延迟消费,等待重试]  
└───────┬───────┘  
        │  
┌───────┴───────┐  
│   不存在      │  
└───────┬───────┘  
        │  
[设置幂等标识为“消费中”,并设置过期时间]  
        │  
        ▼  
[执行业务逻辑]  
        │  
        ▼  
[业务执行成功?]  
        │  
┌───────┴───────┐  
│     是        │           [更新标识为“已消费”]  
│               │           [删除或保留标识]  
└───────┬───────┘  
        │  
┌───────┴───────┐  
│     否        │           [删除标识,允许重试]  
└───────┬───────┘  
        │  
        ▼  
[流程结束]  

4.抽象通用幂等组件

消息防重复消费幂等组件是通用的通常会提取出来也可供其他模块/服务 使用

4.1自定义幂等注解

提供了一种通用的幂等注解,并通过 SpEL 的形式生成去重表全局唯一 Key

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface NoMQDuplicateConsume {

    /**
     * 设置防重令牌 Key 前缀
     */
    String keyPrefix() default "";

    /**
     * 通过 SpEL 表达式生成的唯一 Key
     */
    String key();

    /**
     * 设置防重令牌 Key 过期时间,单位秒,默认 1 小时
     */
    long keyTimeout() default 3600L;
}

4.2. 定义幂等枚举

幂等需要设置两个状态,消费中和已消费,创建对应的枚举

@RequiredArgsConstructor
public enum IdempotentMQConsumeStatusEnum {

    /**
     * 消费中
     */
    CONSUMING("0"),

    /**
     * 已消费
     */
    CONSUMED("1");

    @Getter
    private final String code;

    /**
     * 如果消费状态等于消费中,返回失败
     *
     * @param consumeStatus 消费状态
     * @return 是否消费失败
     */
    public static boolean isError(String consumeStatus) {
        return Objects.equals(CONSUMING.code, consumeStatus);
    }
}

4.3.通过 AOP 的方式进行增强注解

如果说方法上加了注解,会被这段 AOP 代码以环绕增强方式执行

@Slf4j
@Aspect
@RequiredArgsConstructor
public final class NoMQDuplicateConsumeAspect {

    private final StringRedisTemplate stringRedisTemplate;

    private static final String LUA_SCRIPT = """
            local key = KEYS[1]
            local value = ARGV[1]
            local expire_time_ms = ARGV[2]
            return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)
            """;

    /**
     * 增强方法标记 {@link NoMQDuplicateConsume} 注解逻辑
     */
    @Around("@annotation(com.nageoffer.onecoupon.framework.idempotent.NoMQDuplicateConsume)")
    public Object noMQRepeatConsume(ProceedingJoinPoint joinPoint) throws Throwable {
        NoMQDuplicateConsume noMQDuplicateConsume = getNoMQDuplicateConsumeAnnotation(joinPoint);
        String uniqueKey = noMQDuplicateConsume.keyPrefix() + SpELUtil.parseKey(noMQDuplicateConsume.key(), ((MethodSignature) joinPoint.getSignature()).getMethod(), joinPoint.getArgs());

        String absentAndGet = stringRedisTemplate.execute(
                RedisScript.of(LUA_SCRIPT, String.class),
                List.of(uniqueKey),
                IdempotentMQConsumeStatusEnum.CONSUMING.getCode(),
                String.valueOf(TimeUnit.SECONDS.toMillis(noMQDuplicateConsume.keyTimeout()))
        );

        // 如果不为空证明已经有
        if (Objects.nonNull(absentAndGet)) {
            boolean errorFlag = IdempotentMQConsumeStatusEnum.isError(absentAndGet);
            log.warn("[{}] MQ repeated consumption, {}.", uniqueKey, errorFlag ? "Wait for the client to delay consumption" : "Status is completed");
            if (errorFlag) {
                throw new ServiceException(String.format("消息消费者幂等异常,幂等标识:%s", uniqueKey));
            }
            return null;
        }

        Object result;
        try {
            // 执行标记了消息队列防重复消费注解的方法原逻辑
            result = joinPoint.proceed();

            // 设置防重令牌 Key 过期时间,单位秒
            stringRedisTemplate.opsForValue().set(uniqueKey, IdempotentMQConsumeStatusEnum.CONSUMED.getCode(), noMQDuplicateConsume.keyTimeout(), TimeUnit.SECONDS);
        } catch (Throwable ex) {
            // 删除幂等 Key,让消息队列消费者重试逻辑进行重新消费
            stringRedisTemplate.delete(uniqueKey);
            throw ex;
        }
        return result;
    }

    /**
     * @return 返回自定义防重复消费注解
     */
    public static NoMQDuplicateConsume getNoMQDuplicateConsumeAnnotation(ProceedingJoinPoint joinPoint) throws NoSuchMethodException {
        MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
        Method targetMethod = joinPoint.getTarget().getClass().getDeclaredMethod(methodSignature.getName(), methodSignature.getMethod().getParameterTypes());
        return targetMethod.getAnnotation(NoMQDuplicateConsume.class);
    }

lua脚本解释

local key = KEYS[1] # 第一个 Key,即幂等唯一标识 uniqueKey
local value = ARGV[1] # 第一个参数,即初始化幂等消费状态,为消费中
local expire_time_ms = ARGV[2] # 第二个参数,即幂等 Key 过期时间

return redis.call('SET', key, value, 'NX', 'GET', 'PX', expire_time_ms)

该脚本的主要作用是:在 Redis 中尝试以 NX 方式设置一个键,即如果键不存在,则设置新值,并返回设置之前的旧值,同时为该键设置过期时间(以毫秒为单位)。

获取到 Redis 里面的 Key 值后,可能会有三个流程执行:

  • absentAndGet 为空:代表消息是第一次到达,执行完 LUA 脚本后,会在 Redis 设置 Key 的 Value 值为 0,消费中状态。
  • absentAndGet 为 0:代表已经有相同消息到达并且还没有处理完,会通过抛异常的形式让 RocketMQ 重试。
  • absentAndGet 为 1:代表已经有相同消息消费完成,返回空表示不执行任何处理。

4.4.注册为 Spring Bean

另外可以看看另一篇基于分布式锁注解防重复提交

https://blog.csdn.net/sjsjsbbsbsn/article/details/145131305?spm=1001.2014.3001.5501

public class IdempotentConfiguration {
    /**
     * 防止消息队列消费者重复消费消息切面控制器
     */
    @Bean
    public NoMQDuplicateConsumeAspect noMQDuplicateConsumeAspect(StringRedisTemplate stringRedisTemplate) {
        return new NoMQDuplicateConsumeAspect(stringRedisTemplate);
    }
}

4.5EL工具类

public class SpELUtil {
    /**
     * 校验并返回实际使用的 spEL 表达式
     *
     * @param spEl spEL 表达式
     * @return 实际使用的 spEL 表达式
     */
    public static Object parseKey(String spEl, Method method, Object[] contextObj) {
        List<String> spELFlag = ListUtil.of("#", "T(");
        Optional<String> optional = spELFlag.stream().filter(spEl::contains).findFirst();
        if (optional.isPresent()) {
            return parse(spEl, method, contextObj);
        }
        return spEl;
    }

    /**
     * 转换参数为字符串
     *
     * @param spEl       spEl 表达式
     * @param contextObj 上下文对象
     * @return 解析的字符串值
     */
    public static Object parse(String spEl, Method method, Object[] contextObj) {
        DefaultParameterNameDiscoverer discoverer = new DefaultParameterNameDiscoverer();
        ExpressionParser parser = new SpelExpressionParser();
        Expression exp = parser.parseExpression(spEl);
        String[] params = discoverer.getParameterNames(method);
        StandardEvaluationContext context = new StandardEvaluationContext();
        if (ArrayUtil.isNotEmpty(params)) {
            for (int len = 0; len < params.length; len++) {
                context.setVariable(params[len], contextObj[len]);
            }
        }
        return exp.getValue(context);
    }
}

5.实战使用

使用天机学堂项目来进行实战

5.1写入common模块

在这里插入图片描述

5.2使用

在这里插入图片描述

直接加上注解就可以

但是实际上这里不存在幂等问题,因为userId和courseId设置了唯一索引,所以这里不存在幂等性,不需要加上幂等注解

到此这篇关于springboot基于注解实现去重表消息防止重复消费的文章就介绍到这了,更多相关springboot注解防止重复消费内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring事件监听源码解析流程分析

    Spring事件监听源码解析流程分析

    spring事件监听机制离不开容器IOC特性提供的支持,比如容器会自动创建事件发布器,自动识别用户注册的监听器并进行管理,在特定的事件发布后会找到对应的事件监听器并对其监听方法进行回调,这篇文章主要介绍了Spring事件监听源码解析,需要的朋友可以参考下
    2023-08-08
  • 解析Spring中面向切面编程

    解析Spring中面向切面编程

    如果说 IoC 是 Spring 的核心,那么面向切面编程就是 Spring 最为重要的功能之一了,在数据库事务中切面编程被广泛使用
    2021-06-06
  • Java+MySql图片数据保存与读取的具体实例

    Java+MySql图片数据保存与读取的具体实例

    之前一直没有做过涉及到图片存储的应用,最近要做的东东涉及到了这个点,就做了一个小的例子算是对图片存储的初试吧
    2013-06-06
  • Java中的任务调度框架quartz详细解析

    Java中的任务调度框架quartz详细解析

    这篇文章主要介绍了Java中的任务调度框架quartz详细解析,Quartz 是一个完全由 Java 编写的开源作业调度框架,为在 Java 应用程序中进行作业调度提供了简单却强大的机制,需要的朋友可以参考下
    2023-11-11
  • mybatis类型处理器JSR310标准详解

    mybatis类型处理器JSR310标准详解

    这篇文章主要介绍了mybatis类型处理器JSR310标准详解,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-01-01
  • SpringBoot中注解@ConfigurationProperties与@Value的区别与使用详解

    SpringBoot中注解@ConfigurationProperties与@Value的区别与使用详解

    本文主要介绍了SpringBoot中注解@ConfigurationProperties与@Value的区别与使用,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-09-09
  • Java ConcurrentHashMap用法案例详解

    Java ConcurrentHashMap用法案例详解

    这篇文章主要介绍了Java ConcurrentHashMap用法案例详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-08-08
  • java客户端连接ssh失败问题的两种解决方法

    java客户端连接ssh失败问题的两种解决方法

    有的运维工具使用了java的ssh客户端,这些客户端和服务端间有时会出现加密算法协商失败和主机密钥类型协商失败的问题,该问题是由于新客户端/服务端禁用了相关的不安全算法和密钥类型,本文简要记录下该问题的解决方法以备不时之需
    2026-01-01
  • 详解Java中三种状态机实现方式来优雅消灭 if-else 嵌套

    详解Java中三种状态机实现方式来优雅消灭 if-else 嵌套

    这篇文章主要为大家详细介绍了Java中三种状态机实现方式从而优雅消灭 if-else 嵌套,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下
    2025-08-08
  • 解决使用@Value(${×××))从properties文件取值的坑

    解决使用@Value(${×××))从properties文件取值的坑

    这篇文章主要介绍了解决使用@Value(${×××))从properties文件取值的坑,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-07-07

最新评论