如何使用IDEA开发Spark SQL程序(一文搞懂)

 更新时间:2021年08月10日 11:12:11   作者:小哪吒的BD  
Spark SQL 是一个用来处理结构化数据的spark组件。它提供了一个叫做DataFrames的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。这篇文章主要介绍了如何使用IDEA开发Spark SQL程序(一文搞懂),需要的朋友可以参考下

前言

大家好,我是DJ丶小哪吒,我又来跟你们分享知识了。对软件开发有着浓厚的兴趣。喜欢与人分享知识。做博客的目的就是为了能与 他 人知识共享。由于水平有限。博客中难免会有一些错误。如有 纰漏 之处,欢迎大家在留言区指正。小编也会及时改正。

DJ丶小哪吒又来与各位分享知识了。今天我们不飙车,今天就静静的坐下来,我们来聊一聊关于sparkSQL。准备好茶水,听老朽与你娓娓道来。

Spark SQL是什么

Spark SQL 是一个用来处理结构化数据的spark组件。它提供了一个叫做DataFrames的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。

1、使用IDEA开发Spark SQL

Spark会根据文件信息尝试着去推断DataFrame/DataSet的Schema,当然我们也可以手动指定,手动指定的方式有以下几种:

  • 第1种:指定列名添加Schema
  • 第2种:通过StructType指定Schema
  • 第3种:编写样例类,利用反射机制推断Schema

 1.1、指定列名添加Schema

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}


object CreateDFDS {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val rowRDD: RDD[(Int, String, Int)] = linesRDD.map(line =>(line(0).toInt,line(1),line(2).toInt))
    //3.将RDD转成DF
    //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
    import spark.implicits._
    val personDF: DataFrame = rowRDD.toDF("id","name","age")
    personDF.show(10)
    personDF.printSchema()
    sc.stop()
    spark.stop()
  }
}

1.2、通过StructType指定Schema

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SparkSession}


object CreateDFDS2 {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val rowRDD: RDD[Row] = linesRDD.map(line =>Row(line(0).toInt,line(1),line(2).toInt))
    //3.将RDD转成DF
    //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
    //import spark.implicits._
    val schema: StructType = StructType(Seq(
      StructField("id", IntegerType, true),//允许为空
      StructField("name", StringType, true),
      StructField("age", IntegerType, true))
    )
    val personDF: DataFrame = spark.createDataFrame(rowRDD,schema)
    personDF.show(10)
    personDF.printSchema()
    sc.stop()
    spark.stop()
  }
}

1.3、反射推断Schema–掌握

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}


object CreateDFDS3 {
case class Person(id:Int,name:String,age:Int)
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
.getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
    //3.将RDD转成DF
    //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
    import spark.implicits._
    //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
    //所以SparkSQL可以通过反射自动获取到并添加给DF
    val personDF: DataFrame = rowRDD.toDF
    personDF.show(10)
    personDF.printSchema()
    sc.stop()
    spark.stop()
  }
}

1.4、花式查询

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}


object QueryDemo {
case class Person(id:Int,name:String,age:Int)
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL")
.getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val rowRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
    //3.将RDD转成DF
    //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
    import spark.implicits._
    //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
    //所以SparkSQL可以通过反射自动获取到并添加给DF
    val personDF: DataFrame = rowRDD.toDF
    personDF.show(10)
    personDF.printSchema()
    //=======================SQL方式查询=======================
    //0.注册表
    personDF.createOrReplaceTempView("t_person")
    //1.查询所有数据
    spark.sql("select * from t_person").show()
    //2.查询age+1
    spark.sql("select age,age+1 from t_person").show()
    //3.查询age最大的两人
    spark.sql("select name,age from t_person order by age desc limit 2").show()
    //4.查询各个年龄的人数
    spark.sql("select age,count(*) from t_person group by age").show()
    //5.查询年龄大于30的
    spark.sql("select * from t_person where age > 30").show()

    //=======================DSL方式查询=======================
    //1.查询所有数据
    personDF.select("name","age")
    //2.查询age+1
    personDF.select($"name",$"age" + 1)
    //3.查询age最大的两人
    personDF.sort($"age".desc).show(2)
    //4.查询各个年龄的人数
    personDF.groupBy("age").count().show()
    //5.查询年龄大于30的
    personDF.filter($"age" > 30).show()

    sc.stop()
    spark.stop()
  }
  }

1.5、 相互转化

RDD、DF、DS之间的相互转换有很多(6种),但是我们实际操作就只有2类:
1)使用RDD算子操作
2)使用DSL/SQL对表操作

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object TransformDemo {
case class Person(id:Int,name:String,age:Int)

  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileRDD: RDD[String] = sc.textFile("D:\\data\\person.txt")
    val linesRDD: RDD[Array[String]] = fileRDD.map(_.split(" "))
    val personRDD: RDD[Person] = linesRDD.map(line =>Person(line(0).toInt,line(1),line(2).toInt))
    //3.将RDD转成DF
    //注意:RDD中原本没有toDF方法,新版本中要给它增加一个方法,可以使用隐式转换
    import spark.implicits._
    //注意:上面的rowRDD的泛型是Person,里面包含了Schema信息
    //所以SparkSQL可以通过反射自动获取到并添加给DF
    //=========================相互转换======================
    //1.RDD-->DF
    val personDF: DataFrame = personRDD.toDF
    //2.DF-->RDD
    val rdd: RDD[Row] = personDF.rdd
    //3.RDD-->DS
    val DS: Dataset[Person] = personRDD.toDS()
    //4.DS-->RDD
    val rdd2: RDD[Person] = DS.rdd
    //5.DF-->DS
    val DS2: Dataset[Person] = personDF.as[Person]
    //6.DS-->DF
    val DF: DataFrame = DS2.toDF()

    sc.stop()
    spark.stop()
  }
  }

1.6、Spark SQL完成WordCount(案例)

1.6.1、SQL风格

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}


object WordCount {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")
    val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")
    //fileDF.show()
    //fileDS.show()
    //3.对每一行按照空格进行切分并压平
    //fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String
    import spark.implicits._
    val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String
    //wordDS.show()
    /*
    +-----+
    |value|
    +-----+
    |hello|
    |   me|
    |hello|
    |  you|
      ...
     */
    //4.对上面的数据进行WordCount
    wordDS.createOrReplaceTempView("t_word")
    val sql =
      """
        |select value ,count(value) as count
        |from t_word
        |group by value
        |order by count desc
      """.stripMargin
    spark.sql(sql).show()

    sc.stop()
    spark.stop()
  }
}

1.6.2、DQL风格

package cn.itcast.sql

import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}


object WordCount2 {
  def main(args: Array[String]): Unit = {
    //1.创建SparkSession
    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("SparkSQL").getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    //2.读取文件
    val fileDF: DataFrame = spark.read.text("D:\\data\\words.txt")
    val fileDS: Dataset[String] = spark.read.textFile("D:\\data\\words.txt")
    //fileDF.show()
    //fileDS.show()
    //3.对每一行按照空格进行切分并压平
    //fileDF.flatMap(_.split(" ")) //注意:错误,因为DF没有泛型,不知道_是String
    import spark.implicits._
    val wordDS: Dataset[String] = fileDS.flatMap(_.split(" "))//注意:正确,因为DS有泛型,知道_是String
    //wordDS.show()
    /*
    +-----+
    |value|
    +-----+
    |hello|
    |   me|
    |hello|
    |  you|
      ...
     */
    //4.对上面的数据进行WordCount
    wordDS.groupBy("value").count().orderBy($"count".desc).show()

    sc.stop()
    spark.stop()
  }
}

好了,以上内容就到这里了。你学到了吗。

到此这篇关于如何使用IDEA开发Spark SQL程序(一文搞懂)的文章就介绍到这了,更多相关IDEA开发Spark SQL内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • java多态注意项小结

    java多态注意项小结

    面向对象的三大特性:封装、继承、多态。从一定角度来看,封装和继承几乎都是为多态而准备的。今天通过本文给大家介绍java多态注意项总结,感兴趣的朋友一起看看吧
    2021-10-10
  • Java基础之自动装箱,注解操作示例

    Java基础之自动装箱,注解操作示例

    这篇文章主要介绍了Java基础之自动装箱,注解操作,结合实例形式分析了java拆箱、装箱、静态导入、注释等相关使用技巧,需要的朋友可以参考下
    2019-08-08
  • SpringBoot如何使用Undertow做服务器

    SpringBoot如何使用Undertow做服务器

    这篇文章主要介绍了SpringBoot如何使用Undertow做服务器,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-07-07
  • java 数据类型有哪些取值范围多少

    java 数据类型有哪些取值范围多少

    这篇文章主要介绍了java 数据类型有哪些取值范围多少的相关资料,网上关于java 数据类型的资料有很多,不够全面,这里就整理下,需要的朋友可以参考下
    2017-01-01
  • 详解使用spring boot admin监控spring cloud应用程序

    详解使用spring boot admin监控spring cloud应用程序

    这篇文章主要介绍了详解使用spring boot admin监控spring cloud应用程序,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-05-05
  • Spring Cloud Netflix架构浅析(小结)

    Spring Cloud Netflix架构浅析(小结)

    这篇文章主要介绍了Spring Cloud Netflix架构浅析(小结),详解的介绍了Spring Cloud Netflix的概念和组件等,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2018-01-01
  • java LeetCode题解KMP算法示例

    java LeetCode题解KMP算法示例

    这篇文章主要为大家介绍了java LeetCode题解KMP算法示例,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-10-10
  • java使用xstream实现xml文件和对象之间的相互转换

    java使用xstream实现xml文件和对象之间的相互转换

    xml是一个用途比较广泛的文件类型,在java里也自带解析xml的包,但是本文使用的是xstream来实现xml和对象之间的相互转换,xstream是一个第三方开源框架,使用起来比较方便,对java xml和对象转换相关知识感兴趣的朋友一起看看吧
    2023-09-09
  • 解决MyBatis中Enum字段参数解析问题

    解决MyBatis中Enum字段参数解析问题

    本文主要介绍了解决MyBatis中Enum字段参数解析问题,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-08-08
  • Java中ArrayList类的使用方法

    Java中ArrayList类的使用方法

    ArrayList就是传说中的动态数组,用MSDN中的说法,就是Array的复杂版本,下面来简单介绍下
    2013-12-12

最新评论