基于SpringBoot 使用 Flink 收发Kafka消息的示例详解

 更新时间:2023年01月07日 09:12:25   作者:dk168  
这篇文章主要介绍了基于SpringBoot 使用 Flink 收发Kafka消息,本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

前言

这周学习下Flink相关的知识,学习到一个读写Kafka消息的示例, 自己动手实践了一下,别人示例使用的是普通的Java Main方法,没有用到spring boot. 我们在实际工作中会使用spring boot。 因此我做了些加强, 把流程打通了,过程记录下来。

准备工作

首先我们通过docker安装一个kafka服务,参照Kafka的官方知道文档
https://developer.confluent.io/tutorials/kafka-console-consumer-producer-basics/kafka.html主要的是有个docker-compose.yml文件

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.3.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

docker compose up -d
就可以把kafka docker 环境搭起来,
使用以下命令,创建一个flink.kafka.streaming.source的topic
docker exec -t broker kafka-topics --create --topic flink.kafka.streaming.source --bootstrap-server broker:9092
然后使用命令,就可以进入到kafka机器的命令行
docker exec -it broker bash
官方文档示例中没有-it, 运行后没有进入broker的命令行,加上来才可以。这里说明下

Flink我们打算直接采用开发工具运行,暂时未搭环境,以体验为主。

开发阶段

首先需要引入的包POM文件

    <properties>
        <jdk.version>1.8</jdk.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-boot.version>2.7.7</spring-boot.version>
        <flink.version>1.16.0</flink.version>
    </properties>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

这里我们使用Java8, 本来想使用Spring Boot 3的,但是Spring Boot 3 最低需要Java17了, 目前Flink支持Java8和Java11,所以我们使用Spring Boot 2, Java 8来开发。

spring-boot-starter 我们就一个命令行程序,所以用这个就够了
lombok 用来定义model
flink-java, flink-clients, flink-streaming-java 是使用基本组件, 缺少flink-clients编译阶段不会报错,运行的时候会报java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
flink-connector-kafka 是连接kafka用
我们这里把provided, 打包的时候不用打包flink相关组件,由运行环境提供。但是IDEA运行的时候会报java.lang.NoClassDefFoundError: org/apache/flink/streaming/util/serialization/DeserializationSchema,
在运行的configuration上面勾选上“add dependencies with provided scope to classpath”可以解决这个问题。

主要代码

@Component
@Slf4j
public class KafkaRunner implements ApplicationRunner
{
    @Override
    public void run(ApplicationArguments args) throws Exception {
        try{

            /****************************************************************************
             *                 Setup Flink environment.
             ****************************************************************************/

            // Set up the streaming execution environment
            final StreamExecutionEnvironment streamEnv
                    = StreamExecutionEnvironment.getExecutionEnvironment();

            /****************************************************************************
             *                  Read Kafka Topic Stream into a DataStream.
             ****************************************************************************/

            //Set connection properties to Kafka Cluster
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:29092");
            properties.setProperty("group.id", "flink.learn.realtime");

            //Setup a Kafka Consumer on Flnk
            FlinkKafkaConsumer<String> kafkaConsumer =
                    new FlinkKafkaConsumer<>
                            ("flink.kafka.streaming.source", //topic
                                    new SimpleStringSchema(), //Schema for data
                                    properties); //connection properties

            //Setup to receive only new messages
            kafkaConsumer.setStartFromLatest();

            //Create the data stream
            DataStream<String> auditTrailStr = streamEnv
                    .addSource(kafkaConsumer);

            //Convert each record to an Object
            DataStream<Tuple2<String, Integer>> userCounts
                    = auditTrailStr
                    .map(new MapFunction<String,Tuple2<String,Integer>>() {

                        @Override
                        public Tuple2<String,Integer> map(String auditStr) {
                            System.out.println("--- Received Record : " + auditStr);
                            AuditTrail at = new AuditTrail(auditStr);
                            return new Tuple2<String,Integer>(at.getUser(),at.getDuration());
                        }
                    })

                    .keyBy(0)  //By user name
                    .reduce((x,y) -> new Tuple2<String,Integer>( x.f0, x.f1 + y.f1));

            //Print User and Durations.
            userCounts.print();

            /****************************************************************************
             *                  Setup data source and execute the Flink pipeline
             ****************************************************************************/
            //Start the Kafka Stream generator on a separate thread
            System.out.println("Starting Kafka Data Generator...");
            Thread kafkaThread = new Thread(new KafkaStreamDataGenerator());
            kafkaThread.start();

            // execute the streaming pipeline
            streamEnv.execute("Flink Windowing Example");

        }
        catch(Exception e) {
            e.printStackTrace();
        }
    }
}

简单说明下程序
DataStream auditTrailStr = streamEnv
.addSource(kafkaConsumer);
就是接通了Kafka Source

        Thread kafkaThread = new Thread(new KafkaStreamDataGenerator());
        kafkaThread.start();

这段代码是另外开一个线程往kafka里面去发送文本消息
我们在这个示例中就是一个线程发,然后flink就读出来,然后统计出每个用户的操作时间。
auditTrailStr.map 就是来进行统计操作。

运行效果

可以看到Kafka一边发送,然后我们就一边读出来,然后就统计出了每个用户的时间。

总结

本文只是简单的打通了几个环节,对于flink的知识没有涉及太多,算是一个环境入门。后面学习更多的以后我们再深入些来记录flink. 示例代码会放到 https://github.com/dengkun39/redisdemo.git spring-boot-flink 文件夹。

相关文章

  • Java使用CompletableFuture进行非阻塞IO详解

    Java使用CompletableFuture进行非阻塞IO详解

    这篇文章主要介绍了Java使用CompletableFuture进行非阻塞IO详解,CompletableFuture是Java中的一个类,用于支持异步编程和处理异步任务的结果,它提供了一种方便的方式来处理异步操作,并允许我们以非阻塞的方式执行任务,需要的朋友可以参考下
    2023-09-09
  • SpringBoot过滤器如何获取POST请求的JSON参数

    SpringBoot过滤器如何获取POST请求的JSON参数

    这篇文章主要介绍了SpringBoot过滤器如何获取POST请求的JSON参数操作,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-08-08
  • spring aop底层源码执行逻辑剖析(源码解析)

    spring aop底层源码执行逻辑剖析(源码解析)

    这篇文章主要介绍了spring aop底层源码执行逻辑剖析,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2024-08-08
  • springboot集成shiro遭遇自定义filter异常的解决

    springboot集成shiro遭遇自定义filter异常的解决

    这篇文章主要介绍了springboot集成shiro遭遇自定义filter异常的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-11-11
  • Java手动实现Redis的LRU缓存机制

    Java手动实现Redis的LRU缓存机制

    在Java中LRU的实现方式是使用HashMap结合双向链表,HashMap的值是双向链表的节点,双向链表的节点也保存一份key value。
    2021-05-05
  • 用拦截器修改返回response,对特定的返回进行修改操作

    用拦截器修改返回response,对特定的返回进行修改操作

    这篇文章主要介绍了用拦截器修改返回response,对特定的返回进行修改操作。具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2020-09-09
  • 浅析Alibaba Nacos注册中心源码剖析

    浅析Alibaba Nacos注册中心源码剖析

    这篇文章主要介绍了浅析Alibaba Nacos注册中心源码剖析,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-05-05
  • SpringBoot集成iTextPDF的实例

    SpringBoot集成iTextPDF的实例

    SpringBoot集成iTextPDF时,创建PDF文档涉及Document、PdfPTable和PdfPCell对象,设置文档大小和页边距,使用Paragraph设置段落样式,并通过Table和Cell控制表格样式和对齐,还可加入图片美化文档,这些步骤对于生成具有中文内容的PDF文件至关重要
    2024-09-09
  • 以Java代码为例讲解设计模式中的简单工厂模式

    以Java代码为例讲解设计模式中的简单工厂模式

    简单来说,工厂模式就是按照需求来返回一个类型的对象,使用工厂模式的意义就是,如果对象的实例化与代码依赖太大的话,不方便进行扩展和维护,使用工厂的目的就是使对象的实例化与主程序代码就行解耦.来具体看一下:
    2016-05-05
  • Spring Boot使用过滤器和拦截器分别实现REST接口简易安全认证示例代码详解

    Spring Boot使用过滤器和拦截器分别实现REST接口简易安全认证示例代码详解

    这篇文章主要介绍了Spring Boot使用过滤器和拦截器分别实现REST接口简易安全认证示例代码,通过开发实践,理解过滤器和拦截器的工作原理,需要的朋友可以参考下
    2018-06-06

最新评论