Spark-Shell的启动与运行实现过程

 更新时间:2026年01月17日 09:50:49   作者:LMY~~  
文章介绍了如何启动Spark,并演示了Spark RDD的基本操作,包括从文件系统和集合创建RDD,以及使用RDD编程API进行数据处理,如map、filter、flatMap、reduceByKey等,通过实例,展示了如何对RDD进行基本的数学运算、字符串处理和数据统计

一、启动spark

1.先启动zookeeper

三台虚拟机都要启动

zkServer.sh start

2.启动hadoop

start-all.sh

3.启动spark

在spark的根目录下输入

sbin/start-all.sh
spark-shell

二、Spark Rdd的简单操作

1.从文件系统加载数据创建ADD

(1)从Linux本地文件系统加载数据创建RDD——textFile(path)

val rdd = sc.textFile("file:///root/word.txt")
rdd.collect()   //查看命令

(2)从HDFS中加载数据创建RDD

val rdd = sc.textFile("/spark/test/word.txt")
rdd.collect()

scala> val rdd = sc.textFile("/spark/test/word.txt")
rdd: org.apache.spark.rdd.RDD[String] = /spark/test/word.txt MapPartitionsRDD[60] at textFile at :24
scala> rdd.collect()
res27: Array[String] = Array(hello java, hello hadoop, hello mysql)

2.通过集合创建RDD——prarallize()

从一个已经存在的集合、数组,通过sarkContext对象调用parallelize的方法创建RDD,

val array =Array(1,2,3,4,5)
val arrRdd = sc.parallelize(array)
arrRdd.collect()

scala> val array =Array(1,2,3,4,5)
array: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val arrRdd = sc.parallelize(array)
arrRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[61] at parallelize at :26
scala> arrRdd.collect()
res29: Array[Int] = Array(1, 2, 3, 4, 5)

3.RDD的处理

一些RDD编程API

命令含义
map()返回一个新的rdd,由()转换后组成
filter()过滤,由()函数计算后返回值为true的元素组成
flatMap()类似于map,但输入元素可以被映射,用于词频拆分
union()相当于数学中集合的并集
intersection()相当于数学中集合的交集
distinct()去重操作后返回一个新的rdd
groupByKey()返回一个(l,iterator[数据类型]) 的rdd
reduceByKey()在一个(k,v)对的rdd上调用,返回一个新的(k,v)对rdd,用于词频统计
sortByKey在一个(k,v)对的rdd上调用,第二个值为true时按从小到大排序,false为从大到小排序
join()返回相同的key对应的所有元素,如(K,(V,W))

(1)案例1

通过并进行生成rdd
val rdd1 =List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10)
val rdd2 =sc.parallelize(rdd1)

scala> val rdd1 =List(5,6,4,7,3,8,2,9,1,10)
rdd1: List[Int] = List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10)
scala> val rdd2 =sc.parallelize(rdd1)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :26
scala> rdd2.collect()
res3: Array[Int] = Array(5, 6, 4, 7, 3, 8, 2, 9, 1, 10)

对rdd1里的每一个元素乘2然后排序
val rdd3=rdd2.map(x=>x*2)
rdd3.collect()

scala> val rdd3=rdd2.map(x=>x*2)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at :28
scala> rdd3.collect()
res4: Array[Int] = Array(10, 12, 8, 14, 6, 16, 4, 18, 2, 20)

val rdd4=rdd3.sortBy(x=>x,true)
 rdd4.collect()

scala> val rdd4=rdd3.sortBy(x=>x,true)
rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at sortBy at :30
scala> rdd4.collect()
res5: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

(2)实例2

val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))
rdd1.collect()

scala> val rdd1 = sc.parallelize(Array(“a b c”, “d e f”, “h i j”))
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at :24
scala> rdd1.collect()
res6: Array[String] = Array(a b c, d e f, h i j)

将rdd1里面的每一个元素先切分在压平
val rdd2 = rdd1.flatMap(x=>x.split(" "))
rdd2.collect

scala> val rdd2 = rdd1.flatMap(x=>x.split(" "))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[8] at flatMap at :26
scala> rdd2.collect()
res7: Array[String] = Array(a, b, c, d, e, f, h, i, j)

(3)实例3

计数word.txt中单词的数量

 val rdd = sc.textFile("/spark/test/word.txt")   //从HDFS中加载数据创建RDD
 val rdd1=rdd.flatMap(x=>x.split(" "))			//将rdd用空格分开	
 val rdd2=rdd1.map(x=>(x,1))					//将不同的单词(k,v)=(k,1)
 rdd2.collect()
 val rdd3=rdd2.groupByKey()						//相同的k放到一起
 rdd3.collect()
 val rdd4=rdd2.reduceByKey((a,b)=>a+b)			//将单词进行数量统计
 rdd4.collect()

scala> val rdd = sc.textFile("/spark/test/word.txt")
rdd: org.apache.spark.rdd.RDD[String] = /spark/test/word.txt MapPartitionsRDD[10] at textFile at :24
scala> val rdd1=rdd.flatMap(x=>x.split(" "))
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at flatMap at :26
scala> val rdd2=rdd1.map(x=>(x,1))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at map at :28
scala> rdd2.collect()
res9: Array[(String, Int)] = Array((hello,1), (java,1), (hello,1), (hadoop,1), (hello,1), (mysql,1))
scala> val rdd3=rdd2.groupByKey()
rdd3: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[13] at groupByKey at :30
scala> rdd3.collect()
res10: Array[(String, Iterable[Int])] = Array((hadoop,CompactBuffer(1)), (mysql,CompactBuffer(1)), (hello,CompactBuffer(1, 1, 1)), (java,CompactBuffer(1)))
scala> val rdd4=rdd2.reduceByKey((a,b)=>a+b)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at :30
scala> rdd4.collect()
res11: Array[(String, Int)] = Array((hadoop,1), (mysql,1), (hello,3), (java,1))

(4)实例4

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 3), ("kitty", 2)))
val rdd2 = sc.parallelize(List(("jerry", 2), ("tom", 1), ("shuke", 2)))
val rdd3 = rdd1.join(rdd2)		//求join
val rdd4 = rdd1.union(rdd2)		//求并集
val rdd5 = rdd4.groupByKey()	//按key分组
rdd5.collect					//查看

res11: Array[(String, Int)] = Array((hadoop,1), (mysql,1), (hello,3), (java,1))
scala> val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 3), (“kitty”, 2)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[15] at parallelize at :24
scala> val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 1), (“shuke”, 2)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[16] at parallelize at :24
scala> val rdd3 = rdd1.join(rdd2)
rdd3: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[19] at join at :28
scala> val rdd4 = rdd1.union(rdd2)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = UnionRDD[20] at union at :28
scala> rdd3.collect()
res12: Array[(String, (Int, Int))] = Array((tom,(1,1)), (jerry,(3,2)))
scala> rdd4.collect()
res13: Array[(String, Int)] = Array((tom,1), (jerry,3), (kitty,2), (jerry,2), (tom,1), (shuke,2))
scala> val rdd5 = rdd4.groupByKey()
rdd5: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[21] at groupByKey at :30
scala> rdd5.collect()
res14: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(1, 1)), (jerry,CompactBuffer(3, 2)), (shuke,CompactBuffer(2)), (kitty,CompactBuffer(2)))

(5)实例5

val rdd1 = sc.parallelize(List(5, 6, 4, 3))
val rdd2 = sc.parallelize(List(1, 2, 3, 4))
val rdd3 = rdd1.union(rdd2)			//求并集
rdd3.collect()
val rdd4 = rdd1.intersection(rdd2) //求交集
rdd4.collect()
val rdd5 = rdd4.distinct()			//去重
rdd5.collect						//查看

scala> val rdd1 = sc.parallelize(List(5, 6, 4, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at :24
scala> val rdd2 = sc.parallelize(List(1, 2, 3, 4))
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[23] at parallelize at :24
scala> val rdd3 = rdd1.union(rdd2)
rdd3: org.apache.spark.rdd.RDD[Int] = UnionRDD[24] at union at :28
scala> rdd3.collect()
res15: Array[Int] = Array(5, 6, 4, 3, 1, 2, 3, 4)
scala> val rdd4 = rdd1.intersection(rdd2)
rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[30] at intersection at :28
scala> rdd4.collect()
res16: Array[Int] = Array(4, 3)
scala> val rdd5 = rdd4.distinct()
rdd5: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[33] at distinct at :30
scala> rdd5.collect()
res17: Array[Int] = Array(4, 3)

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:

相关文章

  • Linux命令dos2unix命令示例详解(将DOS格式文本文件转换成Unix格式)

    Linux命令dos2unix命令示例详解(将DOS格式文本文件转换成Unix格式)

    dos2unix命令 用来将DOS格式的文本文件转换成UNIX格式的,而Unix格式的文本文件在Windows下用Notepad打开时会拼在一起显示,本文介绍Linux命令dos2unix命令示例详解(将DOS格式文本文件转换成Unix格式),感兴趣的朋友一起看看吧
    2024-04-04
  • Shell过滤器的具体使用

    Shell过滤器的具体使用

    这篇文章主要介绍了Shell过滤器的具体使用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2021-03-03
  • 基于shell的if和else详解

    基于shell的if和else详解

    下面小编就为大家带来一篇基于shell的if和else详解。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-06-06
  • Shell脚本中获取本机ip地址的3个方法

    Shell脚本中获取本机ip地址的3个方法

    这篇文章主要介绍了Shell脚本中获取本机ip地址的3个方法,本文直接给出实现代码,需要的朋友可以参考下
    2014-10-10
  • 用shell命令读取与输出数据的代码

    用shell命令读取与输出数据的代码

    本文为大家介绍使用shell命令进行读取与输出数据的方法,其中涉及了文件输出、重定向、管道等相关知识,有兴趣的朋友可以参考下
    2013-02-02
  • Linux下shell基本命令之grep用法及示例小结

    Linux下shell基本命令之grep用法及示例小结

    grep是Unix/Linux系统中用于文本搜索的强大工具,它可以忽略大小写、显示行号、反向选择、递归搜索目录等,本文就来介绍一下,感兴趣的可以了解一下
    2024-12-12
  • shell脚本运行5秒后自动退出的代码

    shell脚本运行5秒后自动退出的代码

    shell脚本运行5秒自动退出的代码,供大家学习参考
    2013-02-02
  • nvidia-smi命令详解和一些高阶技巧讲解

    nvidia-smi命令详解和一些高阶技巧讲解

    一般情况下用的比较多的就是nvidia-smi的命令,其实掌握了这一个命令也就能够覆盖绝大多数场景了,但是本质求真务实的态度,本文调研了相关资料,整理了一些比较常用的nvidia-smi命令的其他用法,感兴趣的朋友跟随小编一起看看吧
    2023-01-01
  • linux shell字符串截取的详细总结(实用!)

    linux shell字符串截取的详细总结(实用!)

    在开发的时候经常会自行写一些小的脚本,其中就用到截取字符串的操作,这篇文章主要给大家介绍了关于linux shell字符串截取的详细方法,文中通过实例代码介绍的非常详细,需要的朋友可以参考下
    2022-07-07
  • 详解Linux定时任务Crontab的介绍与使用

    详解Linux定时任务Crontab的介绍与使用

    linux内置的cron进程能帮我们实现这些需求,cron搭配shell脚本,非常复杂的指令也没有问题。本文主要介绍了定时任务Crontab的使用,需要的可以学习一下
    2022-10-10

最新评论