Spark的广播变量和累加器使用方法代码示例

 更新时间:2017年09月29日 10:57:24   作者:唐予之_  
这篇文章主要介绍了Spark的广播变量和累加器使用方法代码示例,文中介绍了广播变量和累加器的含义,然后通过实例演示了其用法,需要的朋友可以参考下。

一、广播变量和累加器

通常情况下,当向Spark操作(如map,reduce)传递一个函数时,它会在一个远程集群节点上执行,它会使用函数中所有变量的副本。这些变量被复制到所有的机器上,远程机器上并没有被更新的变量会向驱动程序回传。在任务之间使用通用的,支持读写的共享变量是低效的。尽管如此,Spark提供了两种有限类型的共享变量,广播变量和累加器。

1.1 广播变量:

广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。

Spark的动作通过一系列的步骤执行,这些步骤由分布式的shuffle操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。

通过在一个变量v上调用SparkContext.broadcast(v)可以创建广播变量。广播变量是围绕着v的封装,可以通过value方法访问这个变量。举例如下:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

在创建了广播变量之后,在集群上的所有函数中应该使用它来替代使用v.这样v就不会不止一次地在节点之间传输了。另外,为了确保所有的节点获得相同的变量,对象v在被广播之后就不应该再修改。

1.2 累加器:

累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和总和。Spark原生地只支持数字类型的累加器,编程者可以添加新类型的支持。如果创建累加器时指定了名字,可以在Spark的UI界面看到。这有利于理解每个执行阶段的进程。(对于python还不支持)

累加器通过对一个初始化了的变量v调用SparkContext.accumulator(v)来创建。在集群上运行的任务可以通过add或者”+=”方法在累加器上进行累加操作。但是,它们不能读取它的值。只有驱动程序能够读取它的值,通过累加器的value方法。

下面的代码展示了如何把一个数组中的所有元素累加到累加器上:

scala> val accum = sc.accumulator(0, "My Accumulator")
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10

尽管上面的例子使用了内置支持的累加器类型Int,但是开发人员也可以通过继承AccumulatorParam类来创建它们自己的累加器类型。AccumulatorParam接口有两个方法:
zero方法为你的类型提供一个0值。
addInPlace方法将两个值相加。
假设我们有一个代表数学vector的Vector类。我们可以向下面这样实现:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
 def zero(initialValue: Vector): Vector = {
  Vector.zeros(initialValue.size)
 }
 def addInPlace(v1: Vector, v2: Vector): Vector = {
  v1 += v2
 }
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

在Scala里,Spark提供更通用的累加接口来累加数据,尽管结果的类型和累加的数据类型可能不一致(例如,通过收集在一起的元素来创建一个列表)。同时,SparkContext..accumulableCollection方法来累加通用的Scala的集合类型。

累加器仅仅在动作操作内部被更新,Spark保证每个任务在累加器上的更新操作只被执行一次,也就是说,重启任务也不会更新。在转换操作中,用户必须意识到每个任务对累加器的更新操作可能被不只一次执行,如果重新执行了任务和作业的阶段。

累加器并没有改变Spark的惰性求值模型。如果它们被RDD上的操作更新,它们的值只有当RDD因为动作操作被计算时才被更新。因此,当执行一个惰性的转换操作,比如map时,不能保证对累加器值的更新被实际执行了。下面的代码片段演示了此特性:

val accum = sc.accumulator(0)
data.map { x => accum += x; f(x) }
//在这里,accum的值仍然是0,因为没有动作操作引起map被实际的计算.

二.Java和Scala版本的实战演示

2.1 Java版本:

/**
 * 实例:利用广播进行黑名单过滤!
 * 检查新的数据 根据是否在广播变量-黑名单内,从而实现过滤数据。
 */
public class BroadcastAccumulator {
 /**
  * 创建一个List的广播变量
  *
  */
 private static volatile Broadcast<List<String>> broadcastList = null;
 /**
  * 计数器!
  */
 private static volatile Accumulator<Integer> accumulator = null;
 public static void main(String[] args) {
  SparkConf conf = new SparkConf().setMaster("local[2]").
    setAppName("WordCountOnlineBroadcast");
  JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));

  /**
   * 注意:分发广播需要一个action操作触发。
   * 注意:广播的是Arrays的asList 而非对象的引用。广播Array数组的对象引用会出错。
   * 使用broadcast广播黑名单到每个Executor中!
   */
  broadcastList = jsc.sc().broadcast(Arrays.asList("Hadoop","Mahout","Hive"));
  /**
   * 累加器作为全局计数器!用于统计在线过滤了多少个黑名单!
   * 在这里实例化。
   */
  accumulator = jsc.sparkContext().accumulator(0,"OnlineBlackListCounter");

  JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999);

  /**
   * 这里省去flatmap因为名单是一个个的!
   */
  JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
   @Override
   public Tuple2<String, Integer> call(String word) {
    return new Tuple2<String, Integer>(word, 1);
   }
  });
  JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
   @Override
   public Integer call(Integer v1, Integer v2) {
    return v1 + v2;
   }
  });
  /**
   * Funtion里面 前几个参数是 入参。
   * 后面的出参。
   * 体现在call方法里面!
   *
   */
  wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
   @Override
   public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception {
    rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
     @Override
     public Boolean call(Tuple2<String, Integer> wordPair) throws Exception {
      if (broadcastList.value().contains(wordPair._1)) {
       /**
        * accumulator不仅仅用来计数。
        * 可以同时写进数据库或者缓存中。
        */
       accumulator.add(wordPair._2);
       return false;
      }else {
       return true;
      }
     };
     /**
      * 广播和计数器的执行,需要进行一个action操作!
      */
    }).collect();
    System.out.println("广播器里面的值"+broadcastList.value());
    System.out.println("计时器里面的值"+accumulator.value());
    return null;
   }
  });

  jsc.start();
  jsc.awaitTermination();
  jsc.close();
 }
 }

2.2 Scala版本

package com.Streaming
import java.util
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf}
import org.apache.spark.broadcast.Broadcast
/**
 * Created by lxh on 2016/6/30.
 */
object BroadcastAccumulatorStreaming {
 /**
 * 声明一个广播和累加器!
 */
 private var broadcastList:Broadcast[List[String]] = _
 private var accumulator:Accumulator[Int] = _
 def main(args: Array[String]) {
 val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest")
 val sc = new SparkContext(sparkConf)
 /**
  * duration是ms
  */
 val ssc = new StreamingContext(sc,Duration(2000))
 // broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark"))
 broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark"))
 accumulator= ssc.sparkContext.accumulator(0,"broadcasttest")
 /**
  * 获取数据!
  */
 val lines = ssc.socketTextStream("localhost",9999)
 /**
  * 1.flatmap把行分割成词。
  * 2.map把词变成tuple(word,1)
  * 3.reducebykey累加value
  * (4.sortBykey排名)
  * 4.进行过滤。 value是否在累加器中。
  * 5.打印显示。
  */
 val words = lines.flatMap(line => line.split(" "))
 val wordpair = words.map(word => (word,1))
 wordpair.filter(record => {broadcastList.value.contains(record._1)})
 val pair = wordpair.reduceByKey(_+_)
 /**
  * 这个pair 是PairDStream<String, Integer>
  * 查看这个id是否在黑名单中,如果是的话,累加器就+1
  */
/* pair.foreachRDD(rdd => {
  rdd.filter(record => {
  if (broadcastList.value.contains(record._1)) {
   accumulator.add(1)
   return true
  } else {
   return false
  }
  })
 })*/
 val filtedpair = pair.filter(record => {
  if (broadcastList.value.contains(record._1)) {
   accumulator.add(record._2)
   true
  } else {
   false
  }
  }).print
 println("累加器的值"+accumulator.value)
 // pair.filter(record => {broadcastList.value.contains(record._1)})
 /* val keypair = pair.map(pair => (pair._2,pair._1))*/
 /**
  * 如果DStream自己没有某个算子操作。就通过转化transform!
  */
 /* keypair.transform(rdd => {
  rdd.sortByKey(false)//TODO
 })*/
 pair.print()
 ssc.start()
 ssc.awaitTermination()
 }
}

总结

以上就是本文关于Spark的广播变量和累加器使用方法代码示例的全部内容,希望对大家有所帮助。感兴趣的朋友可以参阅:详解Java编写并运行spark应用程序的方法  、 Spark入门简介等,有什么问题可以随时留言,小编会及时回复大家。感谢朋友们对脚本之家网站的支持。

相关文章

  • Ingress七层路由机制实现域名的方式访问k8s

    Ingress七层路由机制实现域名的方式访问k8s

    这篇文章主要为大家介绍了Ingress七层路由机制实现域名的方式访问k8s内部应用,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步
    2022-03-03
  • ansible管理工具的环境及部署安装

    ansible管理工具的环境及部署安装

    这篇文章主要为大家介绍了管理工具ansible的环境安装及部署过程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步早日升职加薪
    2022-03-03
  • V Rising 服务器搭建图文教程

    V Rising 服务器搭建图文教程

    这篇文章主要介绍了V Rising 服务器搭建,本文通过图文并茂的形式给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-06-06
  • idea新建maven项目时速度缓慢的解决方法

    idea新建maven项目时速度缓慢的解决方法

    下面小编就为大家分享一篇idea新建maven项目时速度缓慢的解决方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2017-11-11
  • Web服务器和应用服务器之间的区别详解

    Web服务器和应用服务器之间的区别详解

    这篇文章主要介绍了Web服务器和应用服务器之间的区别详解,应用服务器是为客户端提供对业务逻辑的访问这种服务器,根据客户端的请求会将数据转化为动态内容,一般还需要数据库的支持,应用服务器的搭建很多时候依赖于应用程序的开发语言,需要的朋友可以参考下
    2023-07-07
  • Windows10下hyperledger fabric1.4环境搭建过程图解

    Windows10下hyperledger fabric1.4环境搭建过程图解

    这篇文章主要介绍了Windows10下hyperledger fabric1.4环境搭建过程,本文图文并茂给大家介绍的非常详细,具有一定的参考借鉴价值,需要的朋友可以参考下
    2019-10-10
  • cwRsync提示password file must be owned by root when running as root的解决方法

    cwRsync提示password file must be owned by root when running as

    今天在配置服务器的时候,用了rsync4.10版本,客户端是2003服务器端是2008 r2 同步的时候提示password file must be owned by root when running as root问题,以前用老版本的时候没见过,还好看了下面的文章解决了,特分享下
    2015-08-08
  • 一文概括6种负载均衡技术的实现方式(小结)

    一文概括6种负载均衡技术的实现方式(小结)

    这篇文章主要介绍了一文概括6种负载均衡技术的实现方式(小结),小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2019-05-05
  • Elasticsearch6.2服务器升配后的bug(避坑指南)

    Elasticsearch6.2服务器升配后的bug(避坑指南)

    这篇文章主要介绍了Elasticsearch6.2服务器升配后的bug问题及解决方法,可以帮助有其他人避坑,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
    2022-09-09
  • HTTP常见的状态码HTTP Status Code

    HTTP常见的状态码HTTP Status Code

    这篇文章主要介绍了HTTP常见的状态码HTTP Status Code
    2017-01-01

最新评论