Spring boot 项目中如何进行kafka Stream app 开发

 更新时间:2025年09月26日 09:28:54   作者:小落的编程笔记  
文章介绍了Kafka Streams的配置要点与核心方法,强调应用ID唯一性、正确设置Bootstrap服务器,区分KStream与Java Stream的不可变性和多消费特性,本文给大家介绍Springboot项目中如何进行kafka Stream app开发,感兴趣的朋友一起看看吧

Kafka Stream

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

Kafka Stream的特点

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

下面介绍Spring boot 项目中进行kafka Stream app 开发的详细过程。

1. 导入依赖

      <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>3.6.2</version>
      </dependency>

2. 示例代码(伪代码)

这段伪代码只是为了举例设置的场景,业务场景并不一定合适

@Slf4j
@Component
public class MyKafkaStreamProcessor {
	@Value("${spring.kafka.bootstrap-servers}")
	private String bootstrapServers;
	@PostConstruct
	private void init () {
		String appId = "my-kafka-streams-app";
		myKafkaStreams(appId);
		log.info("✅ Kafka Streams:{} 初始化完成,开始监听 topic: {}", appId, "source-topic");
	}
	public void myKafkaStreams(String appId) {
		/*=======配置=======*/
		Properties config  = new Properties();
		config.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
		config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
		StreamsBuilder builder = new StreamsBuilder();
		/*=======构建拓扑结构=======*/
		// 数据清洗
		KStream<String, Order> stream = builder
			.stream("source-topic", Consumed.with(Serdes.String(), new JsonSerde<>(Order.class)))
			.mapValues(value -> {
				// do something
				return value;
			});
		// 过滤出从app创建的订单 并进行处理
		stream.filter((k, v) -> Order.getSource.equals("app"))
			.foreach((k, v) -> {
				// do something
			});
		// 发送到第一个topic
		stream.mapValues(value -> JSON.toJSONString(value), Named.as("to-the-first-target-topic-processor"))
			.to("the-first-target-topic", Produced.with(Serdes.String(), Serdes.String()));
		// 发送到第二个topic
		stream.filter((k, v) -> {
				// filter something
			})
			.mapValues(value -> {
				// map to another object
			}, Named.as("to-the-second-target-topic-processor"))
			.to("the-second-target-topic", Produced.with(Serdes.String(), Serdes.String()));
		/*=======创建KafkaStreams=======*/
		KafkaStreams streams = new KafkaStreams(builder.build(), config);
		/*=======设置异常处理器=======*/
		streams.setUncaughtExceptionHandler(new CustomStreamsUncaughtExceptionHandler());
		/*=======启动streams=======*/
		streams.start();
		/*=======添加jvm hook 确保streams安全退出=======*/
		Runtime.getRuntime().addShutdownHook(new Thread(() -> {
			log.info("关闭 Kafka Streams:{}...", appId);
			streams.close();
			log.info("Kafka Streams:{}已经关闭!", appId);
		}));
	}
}
@Slf4j
public class CustomStreamsUncaughtExceptionHandler implements StreamsUncaughtExceptionHandler {
	/**
	 * Inspect the exception received in a stream thread and respond with an action.
	 *
	 */
	@Override
	public StreamThreadExceptionResponse handle(Throwable throwable) {
		log.error("Kafka Streams 线程发生未捕获异常: {}", ExceptionUtil.stacktraceToString(throwable));
		// 选择处理策略(以下三选一):
		// 1. 替换线程(继续运行)
		return StreamThreadExceptionResponse.REPLACE_THREAD;
		// 2. 关闭整个 Streams 应用
		// return StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
		// 3. 关闭整个 JVM
		// return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
	}
}

3. 一些注意事项和说明

  • kafka stream 的处理部分集中在构建的拓扑中,其他部分大同小异
  • 在配置部分 StreamsConfig.APPLICATION_ID_CONFIG 这个参数是必须的,且不能重复,否则会启动失败,StreamsConfig.BOOTSTRAP_SERVERS_CONFIG 是kafka的IP与端口
  • 需要注意的是kafka的 KStream 与 java 中的 stream并不相同,在java中 stream只能被消费一次,但是kstream 可以被消费多次,在上面的demo中可以看到,同一个 kstream 被多次消费,且kstream中的数据是不可变的,也就是无论在上一个处理器(processor)对数据进行了何种处理,下一个处理器从kstream 中获取的数据依旧是原来的数据
  • 在kafka stream app中应该对可能会抛出的异常进行处理,而不是全部交给UncaughtExceptionHandlerUncaughtExceptionHandler应该只处理哪些无法预料的异常
  • 如果kafka stream app 捕获未处理异常之后的处理策略也是替换线程,那么kafka stream app 中如果抛出未捕获异常,那么这个消费者组就会进入再平衡状态(PreparingRebalance),老的消费者从消费者组中剔除,新的消费者加入消费者组,然后再开始消费,注意这种替换线程的处理策略可能导致消息重放,也就是原本的线程消费的offset没有提交导致新的线程会重复消费之前已经被消费的数据,如果业务会因为消息重放出现异常,建议做幂等

4. kafka Stream 的一些方法说明

  • stream()
    • stream()方法是从源topic获取数据的方法,示例中第一个参数是字符串,也就是源topic的名称,
    • 第二个参数是 Consumed ,用来定义对于消息的key与value反序列化的规则,
    • 示例中将key序列化为string, value 序列化为order对象,需要注意的是,如果在配置的config中没有设置适用于整个stream app的序列化与反序列化规则,那么后续的 to()中必须要指定序列化规则
  • filter()
    • filter()的用法与java stream 中的filter()一致,这里不做说明
  • mapValues()
    • mapValues()的用法,是只对消息的value进行操作,比如将value转换为其他对象。这个方法不会对key进行修改
  • map()
    • 与mapValues()类似,但是可以修改消息的key
  • foreach()
    • 与java stream 的 foreach()类似,也是一个终结方法
  • to()
    • 终结方法,用于将数据发送到另外的topic,示例中第一个参数是目标topic, 第二个参数是Produced,用于定义key和value的序列化规则
    • 除了stream()和to()之外,其他方法基本都可以传一个参数 Named,这个参数是为每个处理器节点命名,如果不传则自动生成,但是在同一个kstream中 命名不能重复。这个名称不会影响功能,但是如果有一个名称可以在后续调试和监控中提供一点帮助

到此这篇关于Spring boot 项目中如何进行kafka Stream app 开发的文章就介绍到这了,更多相关Spring boot kafka Stream app 开发内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • Java中 SLF4J和Logback和Log4j和Logging的区别与联系

    Java中 SLF4J和Logback和Log4j和Logging的区别与联系

    这篇文章主要介绍了Java中 SLF4J和Logback和Log4j和Logging的区别与联系,文章通过围绕主题展开详细的内容介绍,具有一定的参考几种,感兴趣的小伙伴可以参考一下
    2022-09-09
  • java多态的向上转型的概念及实例分析

    java多态的向上转型的概念及实例分析

    在本篇内容里小编给大家整理的是一篇关于java多态的向上转型的概念及实例分析,对此有兴趣的朋友们可以跟着学习下。
    2021-05-05
  • Java泛型的类型擦除示例详解

    Java泛型的类型擦除示例详解

    Java泛型(Generic)的引入加强了参数类型的安全性,减少了类型的转换,但有一点需要注意,Java 的泛型在编译器有效,在运行期被删除,也就是说所有泛型参数类型在编译后都会被清除掉,这篇文章主要给大家介绍了关于Java泛型的类型擦除的相关资料,需要的朋友可以参考下
    2021-07-07
  • IDEA 错误之找不到或无法加载主类的问题

    IDEA 错误之找不到或无法加载主类的问题

    这篇文章主要介绍了IDEA 错误之找不到或无法加载主类,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-08-08
  • java中关于控件JTextArea的几个方法

    java中关于控件JTextArea的几个方法

    这篇文章主要介绍了java中关于控件JTextArea的几个方法,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-08-08
  • SpringBoot Mail邮件任务详情

    SpringBoot Mail邮件任务详情

    这篇文章主要介绍了SpringBoot Mail邮件任务详情,文章通过spring-boot-starter-mail包展开详细内容,需要的小伙伴可以参考一下
    2022-05-05
  • 阿里Druid数据连接池引发的线上异常解决

    阿里Druid数据连接池引发的线上异常解决

    这篇文章主要为大家介绍了一次关于阿里Druid数据连接池引发的线上异常问题的解决方案,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步
    2022-03-03
  • SpringMVC如何正确接收时间的请求示例分析

    SpringMVC如何正确接收时间的请求示例分析

    这篇文章主要为大家介绍了SpringMVC如何正确接收时间的请求示例分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-09-09
  • Java Scala之模式匹配与隐式转换

    Java Scala之模式匹配与隐式转换

    在Java中我们有switch case default这三个组成的基础语法,在Scala中我们是有match和case组成 default的作用由case代替,本文详细介绍了Scala的模式匹配与隐式转换,感兴趣的可以参考本文
    2023-04-04
  • SpringBoot拦截器与文件上传实现方法与源码分析

    SpringBoot拦截器与文件上传实现方法与源码分析

    其实spring boot拦截器的配置方式和springMVC差不多,只有一些小的改变需要注意下就ok了。本文主要给大家介绍了关于如何在Springboot实现登陆拦截器与文件上传功能,需要的朋友可以参考下
    2022-10-10

最新评论