Java和scala实现 Spark RDD转换成DataFrame的两种方法小结

 更新时间:2018年06月07日 09:04:25   作者:黑白调92  
今天小编就为大家分享一篇Java和scala实现 Spark RDD转换成DataFrame的两种方法小结,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧

一:准备数据源

在项目下新建一个student.txt文件,里面的内容为:

1,zhangsan,20 
2,lisi,21 
3,wanger,19 
4,fangliu,18 

二:实现

Java版:

1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下:

package com.cxd.sql;
import java.io.Serializable;
@SuppressWarnings("serial")
public class Student implements Serializable {
 String sid;
 String sname;
 int sage;
 public String getSid() {
  return sid;
 }
 public void setSid(String sid) {
  this.sid = sid;
 }
 public String getSname() {
  return sname;
 }
 public void setSname(String sname) {
  this.sname = sname;
 }
 public int getSage() {
  return sage;
 }
 public void setSage(int sage) {
  this.sage = sage;
 }
 @Override
 public String toString() {
  return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
 }
 
}
		

2.转换,具体代码如下

package com.cxd.sql;
import java.util.ArrayList;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class TxtToParquetDemo {
 public static void main(String[] args) {
  
  SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");
  SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
  reflectTransform(spark);//Java反射
  dynamicTransform(spark);//动态转换
 }
 
 /**
  * 通过Java反射转换
  * @param spark
  */
 private static void reflectTransform(SparkSession spark)
 {
  JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
  
  JavaRDD<Student> rowRDD = source.map(line -> {
   String parts[] = line.split(",");
   Student stu = new Student();
   stu.setSid(parts[0]);
   stu.setSname(parts[1]);
   stu.setSage(Integer.valueOf(parts[2]));
   return stu;
  });
  
  Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);
  df.select("sid", "sname", "sage").
  coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");
 }
 /**
  * 动态转换
  * @param spark
  */
 private static void dynamicTransform(SparkSession spark)
 {
  JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();
  
  JavaRDD<Row> rowRDD = source.map( line -> {
   String[] parts = line.split(",");
   String sid = parts[0];
   String sname = parts[1];
   int sage = Integer.parseInt(parts[2]);
   
   return RowFactory.create(
     sid,
     sname,
     sage
     );
  });
  
  ArrayList<StructField> fields = new ArrayList<StructField>();
  StructField field = null;
  field = DataTypes.createStructField("sid", DataTypes.StringType, true);
  fields.add(field);
  field = DataTypes.createStructField("sname", DataTypes.StringType, true);
  fields.add(field);
  field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);
  fields.add(field);
  
  StructType schema = DataTypes.createStructType(fields);
  
  Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
  df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");
  
  
 }
 
}

scala版本:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType
object RDD2Dataset {
 
 case class Student(id:Int,name:String,age:Int)
 def main(args:Array[String])
 {
 
 val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()
 import spark.implicits._
 reflectCreate(spark)
 dynamicCreate(spark)
 }
 
 /**
	 * 通过Java反射转换
	 * @param spark
	 */
 private def reflectCreate(spark:SparkSession):Unit={
 import spark.implicits._
 val stuRDD=spark.sparkContext.textFile("student2.txt")
 //toDF()为隐式转换
 val stuDf=stuRDD.map(_.split(",")).map(parts⇒Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()
 //stuDf.select("id","name","age").write.text("result") //对写入文件指定列名
 stuDf.printSchema()
 stuDf.createOrReplaceTempView("student")
 val nameDf=spark.sql("select name from student where age<20")
 //nameDf.write.text("result") //将查询结果写入一个文件
 nameDf.show()
 }
 
 /**
	 * 动态转换
	 * @param spark
	 */
 private def dynamicCreate(spark:SparkSession):Unit={
 val stuRDD=spark.sparkContext.textFile("student.txt")
 import spark.implicits._
 val schemaString="id,name,age"
 val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
 val schema=StructType(fields)
 val rowRDD=stuRDD.map(_.split(",")).map(parts⇒Row(parts(0),parts(1),parts(2)))
 val stuDf=spark.createDataFrame(rowRDD, schema)
  stuDf.printSchema()
 val tmpView=stuDf.createOrReplaceTempView("student")
 val nameDf=spark.sql("select name from student where age<20")
 //nameDf.write.text("result") //将查询结果写入一个文件
 nameDf.show()
 }
}

注:

1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。

2.此代码不适用于spark2.0以前的版本。

以上这篇Java和scala实现 Spark RDD转换成DataFrame的两种方法小结就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持脚本之家。

相关文章

  • Mybatis-Plus时间范围查询方式详解

    Mybatis-Plus时间范围查询方式详解

    这篇文章主要介绍了Mybatis-Plus时间范围查询方式详解,通过两种方式结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-09-09
  • 在Eclipse中更改maven项目名的方法

    在Eclipse中更改maven项目名的方法

    今天小编就为大家分享一篇关于在Eclipse中更改maven项目名的方法,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2019-03-03
  • java中反射和注解的简单使用方法

    java中反射和注解的简单使用方法

    相信大家对注解和反射应该并不陌生,在现在信息飞速发展的年代,各种优秀的框架或许都离不开注解的使用,这篇文章主要给大家介绍了关于java中反射和注解的简单使用方法,需要的朋友可以参考下
    2021-08-08
  • springboot 如何解决static调用service为null

    springboot 如何解决static调用service为null

    这篇文章主要介绍了springboot 如何解决static调用service为null的问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2021-06-06
  • 2020年支持java8的Java反编译工具汇总(推荐)

    2020年支持java8的Java反编译工具汇总(推荐)

    这篇文章主要介绍了2020年支持java8的Java反编译工具汇总,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2020-06-06
  • java获取网络类型的方法

    java获取网络类型的方法

    这篇文章主要介绍了java获取网络类型的方法,涉及java针对网络类型的参数获取及判定技巧,具有一定参考借鉴价值,需要的朋友可以参考下
    2015-10-10
  • Java全面深入探究SpringBoot拦截器与文件上传

    Java全面深入探究SpringBoot拦截器与文件上传

    拦截器对使用SpringMvc、Struts的开发人员来说特别熟悉,因为你只要想去做好一个项目必然会用到它,文件上传是一个很常见的功能。在项目开发过程中,我们通常都会使用一些成熟的上传组件来实现对应的功能
    2022-05-05
  • 利用线程实现动态显示系统时间

    利用线程实现动态显示系统时间

    编写Applet小程序,通过在HTML文档中接收参数,显示当前的系统时间,需要的朋友可以参考下
    2015-10-10
  • java中定义常量方法介绍

    java中定义常量方法介绍

    java中只有static和非static变量,这个属于每个类的,如果需要全局变量比如PI(3.14...),可以写一个类Math,定义static变量PI,调用Math.PI就可以使用了,这样就达到我们使用全局变量的目的了
    2013-10-10
  • Caused by: java.io.IOException: DerInputStream.getLength(): lengthTag=111

    Caused by: java.io.IOException: DerInputStrea

    这篇文章主要介绍了Caused by: java.io.IOException: DerInputStream.getLength(): lengthTag=111, too big,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2024-10-10

最新评论