java发送kafka事务消息的实现方法

 更新时间:2022年07月15日 09:52:58   作者:逆风飞翔的小叔  
本文主要介绍了java发送kafka事务消息的实现方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

前言

事务对java开发的同学来说并不陌生,我们使用事务的目的在于避免产生重复数据或者说利用数据存储中间件的事务特性确保数据的精准性,比如大家熟悉的mysql,我们在程序开始时,只需要在程序中添加上事务注解即可

kafka客户端事务,直接使用客户端提供的相关的API即可,和jdbc事务的使用很类似,主要包含下面5个API

// 1 初始化事务
void initTransactions();


// 2 开启事务
void beginTransaction() throws ProducerFencedException;


// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
 String consumerGroupId) throws ProducerFencedException;


// 4 提交事务
void commitTransaction() throws ProducerFencedException;


// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

下面结合实际的代码以及效果演示进行说明

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
 
import java.util.Properties;
 
public class ProducerTransaction {
 
    public static void main(String[] args) throws Exception {
 
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();
        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
 
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
 
        // 设置事务 id(必须),事务 id 任意起名
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
 
        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
 
        // 初始化事务
        kafkaProducer.initTransactions();
        // 开启事务
        kafkaProducer.beginTransaction();
        System.out.println("开始发送消息");
        try {
            // 4. 调用 send 方法,发送消息
            for (int i = 0; i < 5; i++) {
                // 发送消息
                kafkaProducer.send(new ProducerRecord<>("zcy222", "hello kafka " + i));
            }
            //int i = 1 / 0;
            // 提交事务
            kafkaProducer.commitTransaction();
        } catch (Exception e) {
            System.out.println(e);
            // 终止事务
            kafkaProducer.abortTransaction();
        } finally {
            // 5. 关闭资源
            kafkaProducer.close();
        }
    }
 
}

运行上面的代码,正常是可以发送到指定的topic下

接下来,我们将上面的代码中的 1/0 放开,再次运行程序,可以看到,程序中抛异常了,但是消息并没有发送到kafka的broker,说明事务的配置生效了

 到此这篇关于java发送kafka事务消息的实现方法的文章就介绍到这了,更多相关java发送kafka事务消息内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Spring 使用JavaConfig实现配置的方法步骤

    Spring 使用JavaConfig实现配置的方法步骤

    这篇文章主要介绍了Spring 使用JavaConfig实现配置的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-01-01
  • 详解Java删除Map中元素java.util.ConcurrentModificationException”异常解决

    详解Java删除Map中元素java.util.ConcurrentModificationException”异常解决

    这篇文章主要介绍了详解Java删除Map中元素java.util.ConcurrentModificationException”异常解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-01-01
  • JSP 开发之 releaseSession的实例详解

    JSP 开发之 releaseSession的实例详解

    这篇文章主要介绍了JSP 开发之 releaseSession的实例详解的相关资料,需要的朋友可以参考下
    2017-07-07
  • SpringBoot自定义starter方式

    SpringBoot自定义starter方式

    本文介绍了如何创建一个自定义的Spring Boot Starter,以实现日志功能,通过使用SPI机制,可以在不修改启动类的情况下,实现自动配置和功能导入,同时,还讨论了如何在自定义Starter中编写必要的配置文件和注解,以确保功能的正确实现和配置的智能提示
    2025-02-02
  • 关于application.yml数据库配置方式

    关于application.yml数据库配置方式

    这篇文章主要介绍了关于application.yml数据库配置方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-08-08
  • IDEA插件Statistic统计代码快速分辨烂项目

    IDEA插件Statistic统计代码快速分辨烂项目

    这篇文章主要为大家介绍了使用IDEA插件Statistic来统计项目代码,帮助大家快速识别出烂项目,有需要的朋友可以借鉴参考下,希望能够有所帮助
    2022-01-01
  • Spring Session(分布式Session共享)实现示例

    Spring Session(分布式Session共享)实现示例

    这篇文章主要介绍了Spring Session(分布式Session共享)实现示例,文章内容详细,需要的朋友可以参考下
    2023-01-01
  • 基于JPA的Repository使用详解

    基于JPA的Repository使用详解

    这篇文章主要介绍了JPA的Repository使用详解,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • Spring IOC容器Bean注解创建对象组件扫描

    Spring IOC容器Bean注解创建对象组件扫描

    这篇文章主要为大家介绍了Spring IOC容器Bean注解创建对象组件扫描,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-05-05
  • Spring AOP里的静态代理和动态代理用法详解

    Spring AOP里的静态代理和动态代理用法详解

    这篇文章主要介绍了 Spring AOP里的静态代理和动态代理用法详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-07-07

最新评论