springboot使用kafka事务的示例代码

 更新时间:2024年06月14日 09:38:31   作者:​​​​​​​ 五敷有你  
Kafka 同数据库一样支持事务,当发生异常的时候可以进行回滚,确保消息监听器不会接收到一些错误的或者不需要的消息,本文就来介绍一下springboot使用kafka事务的示例代码,具有一定的参考价值,感兴趣的可以了解一下

先看下下面这种情况,程序都出错了,按理说消息也不应该成功

@GetMapping("/send")
public void test9(String message) {
    kafkaTemplate.send(topic, message);
    throw new RuntimeException("fail");
}

但是执行结果是发生了异常并且消息发送成功了:

Kafka 同数据库一样支持事务,当发生异常的时候可以进行回滚,确保消息监听器不会接收到一些错误的或者不需要的消息。

kafka事务属性是指一系列的生产者生产消息和消费者提交偏移量的操作在一个事务,或者说是是一个原子操作),同时成功或者失败。使用事务也很简单,需要先开启事务支持,然后再使用。

如何开启事务

如果使用默认配置只需要在yml添加spring.kafka.producer.transaction-id-prefix配置来开启事务,之前没有使用默认的配置,自定义的kafkaTemplate,那么需要在ProducerFactory中设置事务Id前缀开启事务并将KafkaTransactionManager注入到spring中,看下KafkaProducerConfig完整代码:

@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
 
public Map<String,Object> producerConfigs(){
    Map<String,Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    props.put(ProducerConfig.RETRIES_CONFIG,retries);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // 配置分区策略
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.example.springbootkafka.config.CustomizePartitioner");
    // 配置生产者拦截器
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.example.springbootkafka.interceptor.CustomProducerInterceptor");
    // 配置拦截器消息处理类
    SendMessageInterceptorUtil sendMessageInterceptorUtil = new SendMessageInterceptorUtil();
    props.put("interceptorUtil",sendMessageInterceptorUtil);
    return props;
}
 
@Bean
public ProducerFactory<String,String> producerFactory(){
    DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory(producerConfigs());
    //设置事务Id前缀 开启事务
    producerFactory.setTransactionIdPrefix("tx-");
    return producerFactory;
}
 
@Bean
public KafkaTemplate<String,String> kafkaTemplate(){
    return new KafkaTemplate<>(producerFactory());
}
 
@Bean
public KafkaTransactionManager<Integer, String> kafkaTransactionManager(ProducerFactory<String, String> producerFactory) {
    return new KafkaTransactionManager(producerFactory);
}
} 

配置开启事务后,使用大体有两种方式,先记录下第一种使用事务方式:使用 executeInTransaction 方法

直接看下代码:

@GetMapping("/send11")
public void test11(String message) {
    kafkaTemplate.executeInTransaction(operations ->{
        operations.send(topic,message);
        throw new RuntimeException("fail");
    });
}

当然你可以这么写:

@GetMapping("/send11")
public void test11(String message) {
    kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback(){
        @Override
        public Object doInOperations(KafkaOperations operations) {
            operations.send(topic,message);
            throw new RuntimeException("fail");
        }
    });
}

启动项目,访问http://localhost:8080/send10?message=test10 结果如下:

如上:消费者没打印消息,说明消息没发送成功,并且前面会报错org.apache.kafka.common.KafkaException: Failing batch since transaction was aborted 的错误,说明事务生效了。

第一种使用事务方式:使用 @Transactional 注解方式 直接在方法上加上@Transactional注解即可,看下代码:

@GetMapping("/send12")
@Transactional
public void test12(String message) {
    kafkaTemplate.send(topic, message);
    throw new RuntimeException("fail");
}

如果开启的事务,则后续发送消息必须使用@Transactional注解或者使用kafkaTemplate.executeInTransaction() ,否则抛出异常,异常信息如下:

贴下完整的异常吧:java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record

到此这篇关于springboot使用kafka事务的文章就介绍到这了,更多相关springboot kafka事务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java Annotation注解相关原理代码总结

    Java Annotation注解相关原理代码总结

    这篇文章主要介绍了Java Annotation注解相关原理代码总结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-07-07
  • Java中的这些骚操作你不能不知道!!!

    Java中的这些骚操作你不能不知道!!!

    今天在看python相关的东西,看到各种骚操作,回头想了下Java有没有什么骚操作,整理下面几种,一起看一下吧,希望能给你带来帮助
    2021-07-07
  • 在IDEA中如何设置最多显示文件标签个数

    在IDEA中如何设置最多显示文件标签个数

    在使用IDEA进行编程时,可能会同时打开多个文件,当文件过多时,文件标签会占据大部分的IDEA界面,影响我们的编程效率,因此,我们可以通过设置IDEA的文件标签显示个数,来优化我们的编程环境,具体的设置方法如下
    2024-10-10
  • 解决Spring MVC中文乱码的编码配置

    解决Spring MVC中文乱码的编码配置

    这篇文章主要为大家介绍了解决SpringMVC中文乱码的编码配置示例详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-10-10
  • Spring Cache的基本使用与实现原理详解

    Spring Cache的基本使用与实现原理详解

    缓存是实际工作中非经常常使用的一种提高性能的方法, 我们会在很多场景下来使用缓存。下面这篇文章主要给大家介绍了关于Spring Cache的基本使用与实现原理的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下
    2018-05-05
  • Java使用easyExcel批量导入数据详解

    Java使用easyExcel批量导入数据详解

    这篇文章主要介绍了Java使用easyExcel批量导入数据详解,通常我们会提供一个模板,此模块我们可以使用easyExcel导出数据生成的一个Excel文件当作模板,提供下载链接,用户在该文件内填入规定的数据格式以后可以批量导入数据到数据库中,需要的朋友可以参考下
    2023-08-08
  • gateway和jwt网关认证实现过程解析

    gateway和jwt网关认证实现过程解析

    这篇文章主要介绍了gateway和jwt网关认证实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-11-11
  • 简单谈谈Java类与类之间的关系

    简单谈谈Java类与类之间的关系

    类与类之间的关系对于理解面向对象具有很重要的作用,以前在面试的时候也经常被问到这个问题,在这里我就简单给大家介绍一下。
    2016-05-05
  • 使用SpringBoot + Redis + Vue实现动态路由加载页面的示例代码

    使用SpringBoot + Redis + Vue实现动态路由加载页面的示例代

    在现代 Web 应用开发中,动态路由加载能够显著提升应用的灵活性和安全性,本文将深入探讨如何利用 Spring Boot、Redis、Element UI 和 Vue 技术栈实现动态路由加载,并通过 Redis 生成和验证有效链接以实现页面访问控制,需要的朋友可以参考下
    2024-09-09
  • Java MD5加密工具类的方法(支持多参数输入)

    Java MD5加密工具类的方法(支持多参数输入)

    在实际开发过程中,MD5加密是一种常见的数据安全处理手段,常用于密码存储、数据完整性校验等场景,这篇文章主要介绍了Java MD5加密工具类(支持多参数输入),需要的朋友可以参考下
    2024-05-05

最新评论