Java lambda表达式实现Flink WordCount过程解析

 更新时间:2020年02月04日 11:55:20   作者:M。  
这篇文章主要介绍了Java lambda表达式实现Flink WordCount过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

这篇文章主要介绍了Java lambda表达式实现Flink WordCount过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下

本篇我们将使用Java语言来实现Flink的单词统计。

代码开发

环境准备

导入Flink 1.9 pom依赖

<dependencies>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.9.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>1.9.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>3.7</version>
    </dependency>
  </dependencies>

构建Flink流处理环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

自定义source

每秒生成一行文本

DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
      private boolean isCanal = false;
      private String[] words = {
          "important oracle jdk license update",
          "the oracle jdk license has changed for releases starting april 16 2019",
          "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
          "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
          "downloading and using this product an faq is available here ",
          "commercial license and support is available with a low cost java se subscription",
          "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
      };

      @Override
      public void run(SourceContext<String> ctx) throws Exception {
        // 每秒发送一行文本
        while (!isCanal) {
          int randomIndex = RandomUtils.nextInt(0, words.length);
          ctx.collect(words[randomIndex]);
          Thread.sleep(1000);
        }
      }

      @Override
      public void cancel() {
        isCanal = true;
      }
    });

单词计算

// 3. 单词统计
    // 3.1 将文本行切分成一个个的单词
    SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
      // 切分单词
      Arrays.stream(line.split(" ")).forEach(word -> {
        ctx.collect(word);
      });
    }).returns(Types.STRING);

    //3.2 将单词转换为一个个的元组
    SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
        .map(word -> Tuple2.of(word, 1))
        .returns(Types.TUPLE(Types.STRING, Types.INT));

    // 3.3 按照单词进行分组
    KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

    // 3.4 对每组单词数量进行累加
    SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
        .timeWindow(Time.seconds(3))
        .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

    resultDS.print();

参考代码

public class WordCount {
  public static void main(String[] args) throws Exception {
    // 1. 构建Flink流式初始化环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 2. 自定义source - 每秒发送一行文本
    DataStreamSource<String> wordLineDS = env.addSource(new RichSourceFunction<String>() {
      private boolean isCanal = false;
      private String[] words = {
          "important oracle jdk license update",
          "the oracle jdk license has changed for releases starting april 16 2019",
          "the new oracle technology network license agreement for oracle java se is substantially different from prior oracle jdk licenses the new license permits certain uses such as ",
          "personal use and development use at no cost but other uses authorized under prior oracle jdk licenses may no longer be available please review the terms carefully before ",
          "downloading and using this product an faq is available here ",
          "commercial license and support is available with a low cost java se subscription",
          "oracle also provides the latest openjdk release under the open source gpl license at jdk java net"
      };

      @Override
      public void run(SourceContext<String> ctx) throws Exception {
        // 每秒发送一行文本
        while (!isCanal) {
          int randomIndex = RandomUtils.nextInt(0, words.length);
          ctx.collect(words[randomIndex]);
          Thread.sleep(1000);
        }
      }

      @Override
      public void cancel() {
        isCanal = true;
      }
    });

    // 3. 单词统计
    // 3.1 将文本行切分成一个个的单词
    SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
      // 切分单词
      Arrays.stream(line.split(" ")).forEach(word -> {
        ctx.collect(word);
      });
    }).returns(Types.STRING);

    //3.2 将单词转换为一个个的元组
    SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
        .map(word -> Tuple2.of(word, 1))
        .returns(Types.TUPLE(Types.STRING, Types.INT));

    // 3.3 按照单词进行分组
    KeyedStream<Tuple2<String, Integer>, String> keyedDS = tupleDS.keyBy(tuple -> tuple.f0);

    // 3.4 对每组单词数量进行累加
    SingleOutputStreamOperator<Tuple2<String, Integer>> resultDS = keyedDS
        .timeWindow(Time.seconds(3))
        .reduce((t1, t2) -> Tuple2.of(t1.f0, t1.f1 + t2.f1));

    resultDS.print();

    env.execute("app");
  }
}

Flink对Java Lambda表达式支持情况

Flink支持Java API所有操作符使用Lambda表达式。但是,但Lambda表达式使用Java泛型时,就需要声明类型信息。

我们来看下上述的这段代码:

SingleOutputStreamOperator<String> wordsDS = wordLineDS.flatMap((String line, Collector<String> ctx) -> {
      // 切分单词
      Arrays.stream(line.split(" ")).forEach(word -> {
        ctx.collect(word);
      });
    }).returns(Types.STRING);

之所以这里将所有的类型信息,因为Flink无法正确自动推断出来Collector中带的泛型。我们来看一下FlatMapFuntion的源代码

@Public
@FunctionalInterface
public interface FlatMapFunction<T, O> extends Function, Serializable {

  /**
  * The core method of the FlatMapFunction. Takes an element from the input data set and transforms
  * it into zero, one, or more elements.
  *
  * @param value The input value.
  * @param out The collector for returning result values.
  *
  * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
  *          to fail and may trigger recovery.
  */
  void flatMap(T value, Collector<O> out) throws Exception;
}

我们发现 flatMap的第二个参数是Collector<O>,是一个带参数的泛型。Java编译器编译该代码时会进行参数类型擦除,所以Java编译器会变成成:

void flatMap(T value, Collector out)

这种情况,Flink将无法自动推断类型信息。如果我们没有显示地提供类型信息,将会出现以下错误:

org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Collector' are missing.
  In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved.
  An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.FlatMapFunction' interface.
  Otherwise the type has to be specified explicitly using type information.

这种情况下,必须要显示指定类型信息,否则输出将返回值视为Object类型,这将导致Flink无法正确序列化。

所以,我们需要显示地指定Lambda表达式的参数类型信息,并通过returns方法显示指定输出的类型信息

我们再看一段代码:

SingleOutputStreamOperator<Tuple2<String, Integer>> tupleDS = wordsDS
        .map(word -> Tuple2.of(word, 1))
        .returns(Types.TUPLE(Types.STRING, Types.INT));

为什么map后面也需要指定类型呢?

因为此处map返回的是Tuple2类型,Tuple2是带有泛型参数,在编译的时候同样会被查出泛型参数信息,导致Flink无法正确推断。

更多关于对Java Lambda表达式的支持请参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/java_lambdas.html

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • 鉴权认证+aop+注解+过滤feign请求的实例

    鉴权认证+aop+注解+过滤feign请求的实例

    这篇文章主要介绍了鉴权认证+aop+注解+过滤feign请求的实例讲解,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-03-03
  • mybatis 查询sql中in条件用法详解(foreach)

    mybatis 查询sql中in条件用法详解(foreach)

    这篇文章主要介绍了mybatis 查询sql中in条件用法详解(foreach),具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2021-02-02
  • JAVA生成pdf文件的实操指南

    JAVA生成pdf文件的实操指南

    最近项目需要实现PDF下载的功能,由于没有这方面的经验,从网上花了很长时间才找到相关的资料,下面这篇文章主要给大家介绍了关于JAVA生成pdf文件的相关资料,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-10-10
  • java中栈和队列的实现和API的用法(详解)

    java中栈和队列的实现和API的用法(详解)

    下面小编就为大家带来一篇java中栈和队列的实现和API的用法(详解)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-05-05
  • Maven中pom.xml文件报错的原因解决

    Maven中pom.xml文件报错的原因解决

    创建Maven项目的时候,如果你选择的Packaging为war,那么就会报错,本文主要介绍了Maven中pom.xml文件报错的原因解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2023-07-07
  • Java 切割字符串的几种方式集合

    Java 切割字符串的几种方式集合

    这篇文章主要介绍了Java 切割字符串的几种方式集合,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-09-09
  • 如何使用java写Student类的功能

    如何使用java写Student类的功能

    这篇文章主要介绍了如何使用java写Student类的功能,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-04-04
  • Red Hat 安装JDK与IntelliJ IDEA的详细过程

    Red Hat 安装JDK与IntelliJ IDEA的详细过程

    YUM是基于Red Hat的Linux发行版的一个强大而用户友好的包管理工具,这篇文章主要介绍了Red Hat安装JDK与IntelliJ IDEA,需要的朋友可以参考下
    2023-08-08
  • Spring Cloud Alibaba之Sentinel实现熔断限流功能

    Spring Cloud Alibaba之Sentinel实现熔断限流功能

    这篇文章主要介绍了Spring Cloud Alibaba之Sentinel,这里使用阿里的sentinel来实现熔断限流功能,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-04-04
  • springboot接收JSON实现示例解析

    springboot接收JSON实现示例解析

    这篇文章主要为大家介绍了springboot如何接收JSON的实现示例解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-07-07

最新评论