消息队列-kafka消费异常问题

 更新时间:2021年07月02日 10:47:47   作者:秃头披风侠_  
这篇文章主要给大家介绍了关于kafka的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

概述

在kafka中,或者是说在任何消息队列中都有个消费顺序的问题。为了保证一个队列顺序消费,当当中一个消息消费异常时,必将影响后续队列消息的消费,这样业务岂不是卡住了。比如笔者举个最简单的例子:我发送1-100的消息,在我的处理逻辑当中 msg%5==0我就进行 int i=1/0操作,这必将抛异常,一直阻塞在msg=5上,后面6-100无法消费。下面笔者给出解决方案。

重试一定次数(消息丢失)

@KafkaHandler
    @KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2", concurrency = "1")
    public void test6(String msg){
              businessProcess(msg);
            }
           private void businessProcess(String msg){
        System.out.println("接收到消息:" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());
       if (Integer.valueOf(msg) % 5 == 0) {
            int i = 1 / 0;
        }
    }

说明:如果读者使用的是java客户端,也就是spring进行实现,那么在不做任何处理的情况下,会自动重试10次,然后消息会被直接处理掉。也就是说如果你的业务允许消息丢失,那么你不需要额外的编码处理

加入到死讯队列(消息不丢失)

消费端代码:

//1.启用手动提交offset
//2.配置errorHandler,用来加入到死讯队列
//3.不管业务处理是否处理异常还是正常都提交offset
@KafkaHandler
    @KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2",
            errorHandler ="kafkaListenerErrorHandler", concurrency = "1")
    public void test6(String msg,Acknowledgment ack){
        try {
            businessProcess(msg);
        }finally {
            //手动提交
            ack.acknowledge();
        }
    }
//1.专门处理死讯队列消息,都是topicName+.DLT的主题
//2.死讯队列里,只有消费成功的才提交offset,否则等待bug修复完上线,继续处理
    @KafkaHandler
    @KafkaListener(topics = {"quickstart-events.DLT"},groupId = "test-consumer-group-2", concurrency = "1")
    public void test7(String msg,Acknowledgment ack){
        try {
            businessProcess(msg);
            ack.acknowledge();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
//业务代码
    private void businessProcess(String msg){
        System.out.println("接收到消息:" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());
        if (Integer.valueOf(msg) % 5 == 0) {
            int i = 1 / 0;
        }
    }

异常处理器

//1.向容器注册一个KafkaListenerErrorHandler类型的bean
//2.该bean就是当处理消息异常的时候,将消息加入到.DLT主题中
@Component("kafkaListenerErrorHandler")
public class KafkaListenerErrorHandlerTest implements KafkaListenerErrorHandler {
   @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    private static final String TOPIC_DLT=".DLT";
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        System.out.println("消费失败消息:"+message.toString());
        //获取消息处理异常主题
        MessageHeaders headers = message.getHeaders();
        String topic=headers.get("kafka_receivedTopic")+TOPIC_DLT;
        //放入死讯队列
        kafkaTemplate.send(topic,message.getPayload());
        return message;
    }
}

效果图

image.png 

说明:以上基本上就是使用死讯队列的方案,也许读者会觉得这样编码复杂度很高,但其实不用担心,其实上面这些代码基本上是使用死讯队列的模板代码,在成熟一点的公司,一般会使用上述代码进行简单封装,这里笔者给个思路,有兴趣同学可以实现一下。我们其实可以使用aop思想,进行自定义一个@EnableDLT这样的注解去实现,这样上面这个方案使用起来是不是就简单优雅了。之前笔者在开发过程中使用过亚马逊的消息队列服务,也不过是这样实现罢了。

总结

本篇文章就到这里了,希望可以给你带来一些帮助,也希望您能够多多关注脚本之家的更多内容!

相关文章

  • SpringBoot之如何正确、安全的关闭服务

    SpringBoot之如何正确、安全的关闭服务

    这篇文章主要介绍了SpringBoot之如何正确、安全的关闭服务问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-03
  • Java的stream流多个字段排序的实现

    Java的stream流多个字段排序的实现

    本文主要介绍了Java的stream流多个字段排序的实现,主要是两种方法,第一种是固定多个字段排序和第二种动态字段进行排序,具有一定的参考价值,感兴趣的可以了解一下
    2023-10-10
  • JAVA中Object的常用方法

    JAVA中Object的常用方法

    JAVA中Object是所有对象的顶级父类,存在于java.lang包中,这个包不需要我们手动导包,本文通过实例代码介绍JAVA中Object的常用方法,感兴趣的朋友一起看看吧
    2023-11-11
  • 基于spring mvc请求controller访问方式

    基于spring mvc请求controller访问方式

    这篇文章主要介绍了spring mvc请求controller访问方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • 详解如何给SpringBoot部署的jar包瘦身

    详解如何给SpringBoot部署的jar包瘦身

    这篇文章主要介绍了如何给SpringBoot部署的jar包瘦身,如今迭代发布是常有的事情,每次都上传一个如此庞大的文件,会浪费很多时间,接下来小编就以一个小项目为例,来演示如何给jar包瘦身,需要的朋友可以参考下
    2023-07-07
  • SpringBoot中RabbitMQ集群的搭建详解

    SpringBoot中RabbitMQ集群的搭建详解

    单个的 RabbitMQ 肯定无法实现高可用,要想高可用,还得上集群。这篇文章主要介绍了SpringBoot中RabbitMQ集群的两种模式的搭建:普通集群搭建和镜像集群搭建,需要的朋友可以参考一下
    2021-12-12
  • Java中Swing类实例讲解

    Java中Swing类实例讲解

    这篇文章主要介绍了Java中Swing类实例讲解,文中用代码实例讲解的很清楚,有需要的同学可以研究下
    2021-02-02
  • Java RabbitMQ消息队列详解常见问题

    Java RabbitMQ消息队列详解常见问题

    消息队列是最古老的中间件之一,从系统之间有通信需求开始,就自然产生了消息队列。本文告诉什么是消息队列,为什么需要消息队列,常见的消息队列有哪些,RabbitMQ的部署和使用
    2022-07-07
  • java微信红包实现算法

    java微信红包实现算法

    这篇文章主要为大家详细介绍了java微信红包实现算法,列出红包的核心算法,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2017-02-02
  • Java嵌入式开发的优势及有点总结

    Java嵌入式开发的优势及有点总结

    在本篇内容里小编给大家整理了关于Java嵌入式开发的优势及相关知识点内容,有兴趣的朋友们学习下。
    2022-11-11

最新评论