IDEA 开发配置SparkSQL及简单使用案例代码

 更新时间:2021年08月10日 10:44:16   作者:zhangfei_bk  
这篇文章主要介绍了IDEA 开发配置SparkSQL及简单使用案例代码,本文通过代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

1.添加依赖

在idea项目的pom.xml中添加依赖。

<!--spark sql依赖,注意版本号-->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.0.0</version>
</dependency>

2.案例代码

package com.zf.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Spark01_SparkSql_Basic {

    def main(args: Array[String]): Unit = {

        //创建上下文环境配置对象
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
        //创建 SparkSession 对象
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()

        // DataFrame
        val df: DataFrame = spark.read.json("datas/user.json")
        //df.show()

        // DataFrame => Sql

        //df.createOrReplaceTempView("user")
        //spark.sql("select * from user").show()
        //spark.sql("select age from user").show()
        //spark.sql("select avg(age) from user").show()

        //DataFrame => Dsl

        //如果涉及到转换操作,转换需要引入隐式转换规则,否则无法转换,比如使用$提取数据的值
        //spark 不是包名,是上下文环境对象名
        import spark.implicits._
        //df.select("age","username").show()
        //df.select($"age"+1).show()
        //df.select('age+1).show()

        // DataSet

        //val seq = Seq(1,2,3,4)
        //val ds: Dataset[Int] = seq.toDS()
        // ds.show()

        // RDD <=> DataFrame
        val rdd = spark.sparkContext.makeRDD(List((1,"张三",10),(2,"李四",20)))
        val df1: DataFrame = rdd.toDF("id", "name", "age")
        val rdd1: RDD[Row] = df1.rdd

        // DataFrame <=> DataSet
        val ds: Dataset[User] = df1.as[User]
        val df2: DataFrame = ds.toDF()

        // RDD <=> DataSet
        val ds1: Dataset[User] = rdd.map {
            case (id, name, age) => {
                User(id, name = name, age = age)
            }
        }.toDS()
        val rdd2: RDD[User] = ds1.rdd

        spark.stop()
    }
    case class User(id:Int,name:String,age:Int)

}

PS:下面看下在IDEA中开发Spark SQL程序

IDEA 中程序的打包和运行方式都和 SparkCore 类似,Maven 依赖中需要添加新的依赖项:

<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-sql_2.11</artifactId>
	<version>2.1.1</version>
</dependency>

一、指定Schema格式

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row

object Demo1 {
  def main(args: Array[String]): Unit = {
    //使用Spark Session 创建表
    val spark = SparkSession.builder().master("local").appName("UnderstandSparkSession").getOrCreate()

    //从指定地址创建RDD
    val personRDD = spark.sparkContext.textFile("D:\\tmp_files\\student.txt").map(_.split("\t"))

    //通过StructType声明Schema
    val schema = StructType(
      List(
        StructField("id", IntegerType),
        StructField("name", StringType),
        StructField("age", IntegerType)))

    //把RDD映射到rowRDD
    val rowRDD = personRDD.map(p=>Row(p(0).toInt,p(1),p(2).toInt))
    val personDF = spark.createDataFrame(rowRDD, schema)

    //注册表
    personDF.createOrReplaceTempView("t_person")

    //执行SQL
    val df = spark.sql("select * from t_person order by age desc limit 4")
    df.show()
    spark.stop()

  }
}

二、使用case class

import org.apache.spark.sql.SparkSession

//使用case class
object Demo2 {

  def main(args: Array[String]): Unit = {
    //创建SparkSession
    val spark = SparkSession.builder().master("local").appName("CaseClassDemo").getOrCreate()

    //从指定的文件中读取数据,生成对应的RDD
    val lineRDD = spark.sparkContext.textFile("D:\\tmp_files\\student.txt").map(_.split("\t"))

    //将RDD和case class 关联
    val studentRDD = lineRDD.map( x => Student(x(0).toInt,x(1),x(2).toInt))

    //生成 DataFrame,通过RDD 生成DF,导入隐式转换
    import spark.sqlContext.implicits._
    val studentDF = studentRDD.toDF

    //注册表 视图
    studentDF.createOrReplaceTempView("student")

    //执行SQL
    spark.sql("select * from student").show()

    spark.stop()
  }
}

//case class 一定放在外面
case class Student(stuID:Int,stuName:String,stuAge:Int)

三、把数据保存到数据库

import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.Row
import java.util.Properties

object Demo3 {
  def main(args: Array[String]): Unit = {
    //使用Spark Session 创建表
    val spark = SparkSession.builder().master("local").appName("UnderstandSparkSession").getOrCreate()

    //从指定地址创建RDD
    val personRDD = spark.sparkContext.textFile("D:\\tmp_files\\student.txt").map(_.split("\t"))

    //通过StructType声明Schema
    val schema = StructType(
      List(
        StructField("id", IntegerType),
        StructField("name", StringType),
        StructField("age", IntegerType)))

    //把RDD映射到rowRDD
    val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1), p(2).toInt))

    val personDF = spark.createDataFrame(rowRDD, schema)

    //注册表
    personDF.createOrReplaceTempView("person")

    //执行SQL
    val df = spark.sql("select * from person ")

    //查看SqL内容
    //df.show()

    //将结果保存到mysql中
    val props = new Properties()
    props.setProperty("user", "root")
    props.setProperty("password", "123456")
    props.setProperty("driver", "com.mysql.jdbc.Driver")
    df.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "student", props)
    spark.close()

  }
}

以上内容转自:
https://blog.csdn.net/weixin_43520450/article/details/106093582
作者:故明所以

到此这篇关于IDEA 开发配置SparkSQL及简单使用案例代码的文章就介绍到这了,更多相关IDEA 开发 SparkSQL内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • SpringBoot整合Canal与RabbitMQ监听数据变更记录

    SpringBoot整合Canal与RabbitMQ监听数据变更记录

    这篇文章主要介绍了SpringBoot整合Canal与RabbitMQ监听数据变更记录,文章围绕主题展开详细的内容介绍,具有一定的参考价值,需要的小伙伴可以参考一下
    2022-09-09
  • 基于SpringBoot后端导出Excel文件的操作方法

    基于SpringBoot后端导出Excel文件的操作方法

    这篇文章给大家介绍了基于SpringBoot后端导出Excel文件的操作方法,文中通过代码示例给大家介绍的非常详细,对大家的学习或工作有一定的帮助,需要的朋友可以参考下
    2024-02-02
  • Java使用Swagger接口框架方法详解

    Java使用Swagger接口框架方法详解

    这篇文章主要介绍了Java使用Swagger接口框架方法,Swagger是一个方便我们更好的编写API文档的框架,而且Swagger可以模拟http请求调用,感兴趣的同学可以参考下文
    2023-05-05
  • java8中新的Date和Time详解

    java8中新的Date和Time详解

    这篇文章主要是java8中新的Date和Time,探讨新Date类和Time类背后的设计原则,有所需要的小伙伴希望能帮助到你
    2016-07-07
  • Spring Data环境搭建实现过程解析

    Spring Data环境搭建实现过程解析

    这篇文章主要介绍了Spring Data环境搭建实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2020-08-08
  • 使用Maven将springboot工程打包成docker镜像

    使用Maven将springboot工程打包成docker镜像

    这篇文章主要介绍了使用Maven将springboot工程打包成docker镜像,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-12-12
  • 详解springboot使用异步注解@Async获取执行结果的坑

    详解springboot使用异步注解@Async获取执行结果的坑

    本文主要介绍了springboot使用异步注解@Async获取执行结果的坑,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2021-08-08
  • SpringBoot如何通过自定义注解实现权限检查详解

    SpringBoot如何通过自定义注解实现权限检查详解

    这篇文章主要给大家介绍了关于SpringBoot如何通过自定义注解实现权限检查的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-10-10
  • SpringBoot集成Druid连接池进行SQL监控的问题解析

    SpringBoot集成Druid连接池进行SQL监控的问题解析

    这篇文章主要介绍了SpringBoot集成Druid连接池进行SQL监控的问题解析,在SpringBoot工程中引入Druid连接池非常简单,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下
    2021-07-07
  • 如何禁用IntelliJ IDEA的LightEdit模式(推荐)

    如何禁用IntelliJ IDEA的LightEdit模式(推荐)

    这篇文章主要介绍了如何禁用IntelliJ IDEA的LightEdit模式,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2020-04-04

最新评论