Kafka整合WebFlux实践

 更新时间:2026年03月06日 15:25:28   作者:西红柿系番茄  
文章介绍了如何在Kafka中整合WebFlux,包括引入依赖和代码示例,并对相关知识进行了总结

Kafka整合WebFlux

1、引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
    <groupId>io.projectreactor.kafka</groupId>
    <artifactId>reactor-kafka</artifactId>
    <version>1.1.0.RELEASE</version>
</dependency>

2、代码示例

@Component
public class KafkaService {

    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    private KafkaSender<String, String> kafkaSender;
    private KafkaReceiver<String, String> kafkaReceiver;

    @PostConstruct
    public void init() {
        final Map<String, Object> producerProps = new HashMap<>();
        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        final SenderOptions<String, String> producerOptions = SenderOptions.create(producerProps);
        this.kafkaSender = KafkaSender.create(producerOptions);

        final Map<String, Object> consumerProps = new HashMap<>();
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "payment-validator-1");
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "payment-validator");
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        ReceiverOptions<String, String> consumerOptions = ReceiverOptions.<String, String>create(consumerProps)
                .subscription(Collections.singleton("demo"))
                .addAssignListener(partitions -> System.out.println("onPartitionsAssigned " + partitions))
                .addRevokeListener(partitions -> System.out.println("onPartitionsRevoked " + partitions));
        KafkaReceiver<String, String> kafkaReceiver = KafkaReceiver.create(consumerOptions);
        kafkaReceiver.receive().doOnNext(r -> {
            System.out.println(r.value());
            r.receiverOffset().acknowledge();
        }).subscribe();
        this.kafkaReceiver = kafkaReceiver;
    }

    public Mono< ?> send() {
        SenderRecord<String, String, Object> senderRecord = SenderRecord.create(new ProducerRecord<>("demo", value()), 1);
        return kafkaSender.send(Mono.just(senderRecord)).next();
    }

    private String value() {
        Map<String, String> map = new HashMap<>();
        map.put("name", UUID.randomUUID().toString());
        try {
            return OBJECT_MAPPER.writeValueAsString(map);
        } catch (JsonProcessingException e) {
            return "{}";
        }
    }
}

3、其它

server:
  port: 8888

spring:
  jackson:
    serialization:
      FAIL_ON_EMPTY_BEANS: false

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • eclipse报错 eclipse启动报错解决方法

    eclipse报错 eclipse启动报错解决方法

    本文将介绍eclipse启动报错解决方法,需要了解的朋友可以参考下
    2012-11-11
  • java8 实现提取集合对象的每个属性

    java8 实现提取集合对象的每个属性

    这篇文章主要介绍了java8 实现提取集合对象的每个属性方式,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-02-02
  • Golang Protocol Buffer案例详解

    Golang Protocol Buffer案例详解

    这篇文章主要介绍了Golang Protocol Buffer案例详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-08-08
  • Java中的StopWatch计时利器使用指南

    Java中的StopWatch计时利器使用指南

    StopWatch通常用于测量一段代码执行所花费的时间,它能够精确地记录开始时间、结束时间,并计算出这中间的时间差,下面给大家介绍Java中的StopWatch计时利器的深度解析与使用指南,感兴趣的朋友一起看看吧
    2025-05-05
  • Spring Boot中记录用户系统操作流程

    Spring Boot中记录用户系统操作流程

    这篇文章主要介绍了如何在Spring Boot中记录用户系统操作流程,将介绍如何在Spring Boot中使用AOP(面向切面编程)和日志框架来实现用户系统操作流程的记录,需要的朋友可以参考下
    2023-07-07
  • idea中Tomcat启动失败的解决

    idea中Tomcat启动失败的解决

    这篇文章主要介绍了idea中Tomcat启动失败的解决,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2024-09-09
  • Mybatis 动态SQL的几种实现方法

    Mybatis 动态SQL的几种实现方法

    这篇文章主要介绍了Mybatis 动态SQL的几种实现方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-11-11
  • Java Kafka实现延迟队列的示例代码

    Java Kafka实现延迟队列的示例代码

    kafka作为一个使用广泛的消息队列,很多人都不会陌生。本文将利用Kafka实现延迟队列,文中的示例代码讲解详细,感兴趣的小伙伴可以尝试一下
    2022-08-08
  • Java图论的两个基本概念之有向图与无向图详解

    Java图论的两个基本概念之有向图与无向图详解

    图论是数学的一个基本分支,涉及对图研究,图是复杂数据结构的可视化表示,有助于理解不同实体之间的关系,这篇文章主要介绍了Java图论的两个基本概念之有向图与无向图的相关资料,需要的朋友可以参考下
    2026-03-03
  • java使用Jco连接SAP过程

    java使用Jco连接SAP过程

    这篇文章主要介绍了java使用Jco连接SAP过程,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-03

最新评论