IDEA 开发配置SparkSQL及简单使用案例代码
1.添加依赖
在idea项目的pom.xml中添加依赖。
<!--spark sql依赖,注意版本号-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
2.案例代码
package com.zf.bigdata.spark.sql
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
object Spark01_SparkSql_Basic {
def main(args: Array[String]): Unit = {
//创建上下文环境配置对象
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
//创建 SparkSession 对象
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
// DataFrame
val df: DataFrame = spark.read.json("datas/user.json")
//df.show()
// DataFrame => Sql
//df.createOrReplaceTempView("user")
//spark.sql("select * from user").show()
//spark.sql("select age from user").show()
//spark.sql("select avg(age) from user").show()
//DataFrame => Dsl
//如果涉及到转换操作,转换需要引入隐式转换规则,否则无法转换,比如使用$提取数据的值
//spark 不是包名,是上下文环境对象名
import spark.implicits._
//df.select("age","username").show()
//df.select($"age"+1).show()
//df.select('age+1).show()
// DataSet
//val seq = Seq(1,2,3,4)
//val ds: Dataset[Int] = seq.toDS()
// ds.show()
// RDD <=> DataFrame
val rdd = spark.sparkContext.makeRDD(List((1,"张三",10),(2,"李四",20)))
val df1: DataFrame = rdd.toDF("id", "name", "age")
val rdd1: RDD[Row] = df1.rdd
// DataFrame <=> DataSet
val ds: Dataset[User] = df1.as[User]
val df2: DataFrame = ds.toDF()
// RDD <=> DataSet
val ds1: Dataset[User] = rdd.map {
case (id, name, age) => {
User(id, name = name, age = age)
}
}.toDS()
val rdd2: RDD[User] = ds1.rdd
spark.stop()
}
case class User(id:Int,name:String,age:Int)
}
PS:下面看下在IDEA中开发Spark SQL程序
IDEA 中程序的打包和运行方式都和 SparkCore 类似,Maven 依赖中需要添加新的依赖项:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.1</version> </dependency>
一、指定Schema格式
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
object Demo1 {
def main(args: Array[String]): Unit = {
//使用Spark Session 创建表
val spark = SparkSession.builder().master("local").appName("UnderstandSparkSession").getOrCreate()
//从指定地址创建RDD
val personRDD = spark.sparkContext.textFile("D:\\tmp_files\\student.txt").map(_.split("\t"))
//通过StructType声明Schema
val schema = StructType(
List(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("age", IntegerType)))
//把RDD映射到rowRDD
val rowRDD = personRDD.map(p=>Row(p(0).toInt,p(1),p(2).toInt))
val personDF = spark.createDataFrame(rowRDD, schema)
//注册表
personDF.createOrReplaceTempView("t_person")
//执行SQL
val df = spark.sql("select * from t_person order by age desc limit 4")
df.show()
spark.stop()
}
}
二、使用case class
import org.apache.spark.sql.SparkSession
//使用case class
object Demo2 {
def main(args: Array[String]): Unit = {
//创建SparkSession
val spark = SparkSession.builder().master("local").appName("CaseClassDemo").getOrCreate()
//从指定的文件中读取数据,生成对应的RDD
val lineRDD = spark.sparkContext.textFile("D:\\tmp_files\\student.txt").map(_.split("\t"))
//将RDD和case class 关联
val studentRDD = lineRDD.map( x => Student(x(0).toInt,x(1),x(2).toInt))
//生成 DataFrame,通过RDD 生成DF,导入隐式转换
import spark.sqlContext.implicits._
val studentDF = studentRDD.toDF
//注册表 视图
studentDF.createOrReplaceTempView("student")
//执行SQL
spark.sql("select * from student").show()
spark.stop()
}
}
//case class 一定放在外面
case class Student(stuID:Int,stuName:String,stuAge:Int)
三、把数据保存到数据库
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.Row
import java.util.Properties
object Demo3 {
def main(args: Array[String]): Unit = {
//使用Spark Session 创建表
val spark = SparkSession.builder().master("local").appName("UnderstandSparkSession").getOrCreate()
//从指定地址创建RDD
val personRDD = spark.sparkContext.textFile("D:\\tmp_files\\student.txt").map(_.split("\t"))
//通过StructType声明Schema
val schema = StructType(
List(
StructField("id", IntegerType),
StructField("name", StringType),
StructField("age", IntegerType)))
//把RDD映射到rowRDD
val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1), p(2).toInt))
val personDF = spark.createDataFrame(rowRDD, schema)
//注册表
personDF.createOrReplaceTempView("person")
//执行SQL
val df = spark.sql("select * from person ")
//查看SqL内容
//df.show()
//将结果保存到mysql中
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123456")
props.setProperty("driver", "com.mysql.jdbc.Driver")
df.write.mode("overwrite").jdbc("jdbc:mysql://localhost:3306/company?serverTimezone=UTC&characterEncoding=utf-8", "student", props)
spark.close()
}
}
以上内容转自:
https://blog.csdn.net/weixin_43520450/article/details/106093582
作者:故明所以
到此这篇关于IDEA 开发配置SparkSQL及简单使用案例代码的文章就介绍到这了,更多相关IDEA 开发 SparkSQL内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
谈谈Java中整数类型(short int long)的存储方式
在java中的整数类型有四种,分别是byte short in long,本文重点给大家介绍java中的整数类型(short int long),由于byte只是一个字节0或1,在此就不多说了,对java中的整数类型感兴趣的朋友一起学习吧2015-11-11
java使用BeanUtils.copyProperties方法对象复制同名字段类型不同赋值为空问题解决方案
这篇文章主要给大家介绍了关于java使用BeanUtils.copyProperties方法对象复制同名字段类型不同赋值为空问题的解决方案,文中通过代码介绍的非常详细,对大家的学习或者工作具有一定的参考借鉴价值,需要的朋友可以参考下2023-11-11
修改xml文件再也不用重启项目mybatis-xmlreload方法
这篇文章主要为大家介绍了修改xml文件再也不用重启项目mybatis-xmlreload,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪2023-03-03
PowerJob的TimingStrategyHandler工作流程源码解读
这篇文章主要为大家介绍了PowerJob的TimingStrategyHandler工作流程源码解读,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪2024-01-01
详解PipedInputStream和PipedOutputStream_动力节点Java学院整理
这篇文章主要为大家详细介绍了管道PipedInputStream和PipedOutputStream,具有一定的参考价值,感兴趣的小伙伴们可以参考一下2017-05-05


最新评论