Spark-Sql入门程序示例详解

 更新时间:2021年12月03日 11:49:24   作者:山不在高水不在深  
Spark SQL 作为 Spark 四大核心组件之一,主要用于处理结构化数据或半结构化数据,它支持在Spark 中使用 SQL 对数据进行查询,本文给大家介绍Spark-Sql入门程序,感兴趣的朋友跟随小编一起看看吧

SparkSQL运行架构

Spark SQL对SQL语句的处理,首先会将SQL语句进行解析(Parse),然后形成一个Tree,在后续的如绑定、优化等处理过程都是对Tree的操作,而操作的方法是采用Rule,通过模式匹配,对不同类型的节点采用不同的操作。

spark-sql是用来处理结构化数据的模块,是入门spark的首要模块。

技术的学习无非就是去了解它的API,但是Spark有点难,因为它的例子和网上能搜到的基本都是Scala写的。我们这里使用Java。

入门例子

数据处理的第一个例子通常都是word count,就是统计一个文件里每个单词出现了几次。我们也来试一下。

> 这个例子网上有很多,即使是通过spark实现的也不少;这里面大部分都是使用Scala写的,我没有试过;少部分是通过Java写的;

Java里面的例子有一些是使用RDD实现的,只有极个别是通过DataSet来做的。但即使这一小撮例子,我也跑不通。

所以我自己来尝试完成这个例子,看到别人用Scala写三五行就完成了,而我尝试了一整天几无进展。在网上东拼西凑熟悉Spark的Java 

还是以我们前面的例子来改:

String logFile = "words";
SparkSession spark = SparkSession.builder().appName("Simple Application").master("local").getOrCreate();
Dataset<String> logData = spark.read().textFile(logFile).cache();

System.out.println("行数:" + logData.count());这里我不再使用之前的README文件,自己创建了一个words文件,内容随意写了一堆单词。

执行程序,可以正常打印出来:

接下来我们需要把句子分割成一个个单词合在一起,然后统计每个单词出现的次数。

> 可能有人会说,这个简单,我用Java8的流一下就处理好了:

把行集合通过flatMap处理,每一行通过split(" ")分割成一个独立的单词集合,再把结果通过自身groupBy一下就拿到终止数据结构Map了。

最后把map的key和value的大小拿到就好了。

的确,使用Java就是这样实现。但是Spark提供了一套和Java的流API名字和效果类似的工具,区别是Spark的是分布式API

我们通过Spark的flatMap先来处理一下:

Dataset<String> words = logData.flatMap((FlatMapFunction<String, String>) k -> Arrays.asList(k.split("\\s")).iterator(), Encoders.STRING());
System.out.println("单词数:" + words.count());
words.foreach(k -> {
System.out.println("W:" + k);
});

不同于Java的流,spark这个flatMap的返回值是可以直接访问结果的:

> 可能有人留意到spark中函数式方法的参数定义和Java差距较大。他们的参数不太一样,还多了个编码器。目前来讲我还不清楚为啥这样定义,不过印象中编码器也是spark3的重要优化内容。

再Java中使用Scala的方法总是有些怪异,Lambda表达式前面总是需要强制类型转换,只是为了指明参数类型,否则需要new一个匿名类。

这个也花了我不少时间,后来找到一个网页org.apache.spark.sql.Dataset.flatMap java code examples | Tabnine

再往后我迷茫了:

KeyValueGroupedDataset<String, String> group = words.groupByKey((Function1<String, String>) k -> k, Encoders.STRING());

这样我已经group好了,但是返回的不是DataSet,我也不知道这个返回有啥用,怎么拿到里面的内容呢?我费了好大劲没搞定。

比如我发现count方法会返回一个DataSet:

看起来正是我想要的,但是当我想把它输出竟然执行报错:

ount.foreach(t -> {
    System.out.println(t);
});

别说foreach了,就算想看看里面的数量(就像一开始我们查看了文件有几行那样)都会报错,错误内容一样

count.count();

查了很多资料,大意是说spark的计算方法都是分布式的,各个任务之间需要通信,通信时需要序列化来传递信息。所以上面我们能看文件行数因为类型是String,有序列化标志;现在生成的是元组,不能序列化。我尝试了各种方法,甚至自己创建新类模拟了计算过程还是不行

查了好久资料,比如Job aborted due to stage failure: Task not serializable: | Databricks Spark Knowledge Base (gitbooks.io)依然没有解决。偶然的机会找到一个令人激动的网站Spark Groupby Example with DataFrame — SparkByExamples终于解决了我的问题。

使用DataFrame

DataFrame虽然是spark提供的重要工具,但是再Java上并没有对应的类,只是把DataSet的泛型对象改成Row而已。注意这个Row没有泛型定义,所以里面有哪些列不知道

可以从一开始就把DataSet转成DataFrame:

但是可以看到要从Row里面拿数据比较麻烦。所以目前我只在需要序列化的地方转:

到此这篇关于Spark-Sql入门程序的文章就介绍到这了,更多相关Spark-Sql入门内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 详解Springboot对多线程的支持

    详解Springboot对多线程的支持

    Spring是通过任务执行器(TaskExecutor)来实现多线程和并发编程,使用ThreadPoolTaskExecutor来创建一个基于线城池的TaskExecutor。这篇文章给大家介绍Springboot对多线程的支持,感兴趣的朋友一起看看吧
    2018-07-07
  • 使用JMX连接JVM实现过程详解

    使用JMX连接JVM实现过程详解

    这篇文章主要介绍了使用JMX连接JVM实现过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-04-04
  • MyBatis动态SQL之<choose><when><otherwise>标签的使用

    MyBatis动态SQL之<choose><when><otherwise>标签的使用

    MyBatis中动态语句choose-when-otherwise 类似于Java中的switch-case-default语句,本文就来介绍一下MyBatis动态SQL之<choose><when><otherwise>标签的使用,感兴趣的可以了解一下
    2023-09-09
  • java IO流之转换流的具体使用

    java IO流之转换流的具体使用

    转换流可以将一个字节流包装成字符流,或者将一个字符流包装成字节流,本文主要介绍了java IO流之转换流的具体使用,具有一定的参考价值,感兴趣的可以了解一下
    2024-01-01
  • Spring Boot中使用RabbitMQ的示例代码

    Spring Boot中使用RabbitMQ的示例代码

    本篇文章主要介绍了Spring Boot中使用RabbitMQ的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-04-04
  • idea推送项目到gitee中的创建方法

    idea推送项目到gitee中的创建方法

    这篇文章主要介绍了idea推送项目到gitee中的创建方法,本文通过图文实例相结合给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2021-08-08
  • jackson json序列化实现首字母大写,第二个字母需小写

    jackson json序列化实现首字母大写,第二个字母需小写

    这篇文章主要介绍了jackson json序列化实现首字母大写,第二个字母需小写方式,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06
  • LCN分布式事务解决方案详解

    LCN分布式事务解决方案详解

    这篇文章主要介绍了LCN分布式事务解决方案详解,本篇文章通过简要的案例,讲解了该项技术的了解与使用,以下就是详细内容,需要的朋友可以参考下
    2021-08-08
  • Java反射机制的适用场景及利弊详解

    Java反射机制的适用场景及利弊详解

    这篇文章主要介绍了Java反射机制的适用场景及利弊详解,Spring用到很多反射机制,在xml文件或者properties里面写好了配置,然后在Java类里面解析xml或properties里面的内容,得到一个字符串,然后用反射机制,需要的朋友可以参考下
    2023-08-08
  • spring boot整合kafka过程解析

    spring boot整合kafka过程解析

    这篇文章主要介绍了spring boot整合kafka过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-02-02

最新评论