Spark学习笔记之Spark SQL的具体使用

 更新时间:2019年06月14日 10:12:28   作者:EVAO_大个子  
这篇文章主要介绍了Spark学习笔记之Spark SQL的具体使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

1. Spark SQL是什么?

  • 处理结构化数据的一个spark的模块
  • 它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用

2. Spark SQL的特点

  • 多语言的接口支持(java python scala)
  • 统一的数据访问
  • 完全兼容hive
  • 支持标准的连接

3. 为什么学习SparkSQL?

我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!

4. DataFrame(数据框)

  • 与RDD类似,DataFrame也是一个分布式数据容器
  • 然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema
  • DataFrame其实就是带有schema信息的RDD

5. SparkSQL1.x的API编程

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>${spark.version}</version>
</dependency>

5.1 使用sqlContext创建DataFrame(测试用)

object Ops3 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val rdd1 = sc.parallelize(List(Person("admin1", 14, "man"),Person("admin2", 16, "man"),Person("admin3", 18, "man")))
    val df1: DataFrame = sqlContext.createDataFrame(rdd1)
    df1.show(1)
  }
}
case class Person(name: String, age: Int, sex: String);

5.2 使用sqlContxet中提供的隐式转换函数(测试用)

import org.apache.spark
val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val rdd1 = sc.parallelize(List(Person("admin1", 14, "man"), Person("admin2", 16, "man"), Person("admin3", 18, "man")))
import sqlContext.implicits._
val df1: DataFrame = rdd1.toDF
df1.show()
5.3 使用SqlContext创建DataFrame(常用)
val conf = new SparkConf().setAppName("Ops3").setMaster("local[3]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowRDD: RDD[Row] = linesRDD.map(line => {
 val lineSplit: Array[String] = line.split(",")
 Row(lineSplit(0), lineSplit(1).toInt, lineSplit(2))
})
val rowDF: DataFrame = sqlContext.createDataFrame(rowRDD, schema)
rowDF.show()

6. 使用新版本的2.x的API

val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
//数据清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
  val splits: Array[String] = line.split(",")
  Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val df: DataFrame = sparkSession.createDataFrame(rowRDD, schema)

df.createOrReplaceTempView("p1")
val df2 = sparkSession.sql("select * from p1")
df2.show()

7. 操作SparkSQL的方式

7.1 使用SQL语句的方式对DataFrame进行操作

val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()//Spark2.x新的API相当于Spark1.x的SQLContext
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
//数据清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
  val splits: Array[String] = line.split(",")
  Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val df: DataFrame = sparkSession.createDataFrame(rowRDD, schema)

df.createOrReplaceTempView("p1")//这是Sprk2.x新的API 相当于Spark1.x的registTempTable()
val df2 = sparkSession.sql("select * from p1")
df2.show()

7.2 使用DSL语句的方式对DataFrame进行操作

DSL(domain specific language ) 特定领域语言

val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest/")
//数据清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
  val splits: Array[String] = line.split(",")
  Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
import sparkSession.implicits._
val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc)
df.show()

8. SparkSQL的输出

8.1 写出到JSON文件

val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest")
//数据清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
  val splits: Array[String] = line.split(",")
  Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
import sparkSession.implicits._
val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc)
df.write.json("hdfs://uplooking02:8020/sparktest1")

8.2 写出到关系型数据库(mysql)

val conf = new SparkConf().setAppName("Ops5") setMaster ("local[3]")
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val sc = sparkSession.sparkContext
val linesRDD: RDD[String] = sc.textFile("hdfs://uplooking02:8020/sparktest")
//数据清洗
val rowRDD: RDD[Row] = linesRDD.map(line => {
  val splits: Array[String] = line.split(",")
  Row(splits(0), splits(1).toInt, splits(2))
})
val schema = StructType(List(StructField("name", StringType), StructField("age", IntegerType), StructField("sex", StringType)))
val rowDF: DataFrame = sparkSession.createDataFrame(rowRDD, schema)
import sparkSession.implicits._
val df: DataFrame = rowDF.select("name", "age").where("age>10").orderBy($"age".desc)
val url = "jdbc:mysql://localhost:3306/test"
//表会自动创建
val tbName = "person1";
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "root")
//SaveMode 默认为ErrorIfExists
df.write.mode(SaveMode.Append).jdbc(url, tbName, prop)

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

相关文章

  • 使用IntelliJ IDEA创建简单的Java Web项目完整步骤

    使用IntelliJ IDEA创建简单的Java Web项目完整步骤

    这篇文章主要介绍了如何使用IntelliJ IDEA创建一个简单的JavaWeb项目,实现登录、注册和查看用户列表功能,使用Servlet和JSP技术,文中通过代码介绍的非常详细,需要的朋友可以参考下
    2025-01-01
  • 深入了解Spring中Bean的作用域和生命周期

    深入了解Spring中Bean的作用域和生命周期

    这篇文章主要介绍了深入了解Spring中Bean的作用域和生命周期,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-10-10
  • 解决SpringBoot连接SqlServer出现的问题

    解决SpringBoot连接SqlServer出现的问题

    在尝试通过SSL与SQL Server建立安全连接时,如果遇到“PKIX path building failed”错误,可能是因为未能正确配置或信任服务器证书,当"Encrypt"属性设置为"true"且"trustServerCertificate"属性设置为"false"时,要求驱动程序使用安全套接字层(SSL)加密与SQL Server建立连接
    2024-10-10
  • java开发BeanUtils类解决实体对象间赋值

    java开发BeanUtils类解决实体对象间赋值

    这篇文章主要为大家介绍了java开发中使用BeanUtils类实现实体对象之间的赋值有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步学有所得
    2021-10-10
  • Jedis零基础入门及操作Redis中的数据结构详解

    Jedis零基础入门及操作Redis中的数据结构详解

    Jedis 的 API 方法跟 Redis 的命令基本上完全一致,熟悉 Redis 的操作命令,自然就很容易使用 Jedis,因此官方也推荐 Java 使用 Jedis 来连接和操作 Redis
    2022-09-09
  • 解决Mybatis报错:org.apache.ibatis.reflection.ReflectionException: There is no getter for property named问题

    解决Mybatis报错:org.apache.ibatis.reflection.ReflectionException

    文章主要讨论了在使用MyBatis进行数据库操作时遇到的几个常见问题及其解决方法,首先,文章指出如果DTO类中没有定义getter和setter方法,会导致反射异常,解决方法是使用Lombok的@Data注解自动生成这些方法
    2025-01-01
  • Java环境配置原理全面解析

    Java环境配置原理全面解析

    下面小编就为大家带来一篇Java环境配置原理全面解析。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-09-09
  • MyBatis Mapper.XML 标签使用小结

    MyBatis Mapper.XML 标签使用小结

    在MyBatis中,通过resultMap可以解决字段名和属性名不一致的问题,对于复杂的查询,引用实体或使用<sql>标签可以定义复用的SQL片段,提高代码的可读性和编码效率,使用这些高级映射和动态SQL技巧,可以有效地处理复杂的数据库交互场景
    2024-10-10
  • Java简单实现调用命令行并获取执行结果示例

    Java简单实现调用命令行并获取执行结果示例

    这篇文章主要介绍了Java简单实现调用命令行并获取执行结果,结合实例形式分析了Java调用ping命令并获取执行结果相关操作技巧,需要的朋友可以参考下
    2018-08-08
  • SpringMVC实战案例RESTFul实现添加功能

    SpringMVC实战案例RESTFul实现添加功能

    这篇文章主要为大家介绍了SpringMVC实战案例RESTFul实现添加功能详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2022-05-05

最新评论