• Spark踩坑记——初试

  • Spark踩坑记——数据库(Hbase+Mysql)

  • Spark踩坑记——Spark Streaming+kafka应用及调优
    在前面总结的几篇spark踩坑博文中,我总结了自己在使用spark过程当中踩过的一些坑和经验。我们知道Spark是多机器集群部署的,分为Driver/Master/Worker,Master负责资源调度,Worker是不同的运算节点,由Master统一调度,而Driver是我们提交Spark程序的节点,并且所有的reduce类型的操作都会汇总到Driver节点进行整合。节点之间会将map/reduce等操作函数传递一个独立副本到每一个节点,这些变量也会复制到每台机器上,而节点之间的运算是相互独立的,变量的更新并不会传递回Driver程序。那么有个问题,如果我们想在节点之间共享一份变量,比如一份公共的配置项,该怎么办呢?Spark为我们提供了两种特定的共享变量,来完成节点间变量的共享。
    本文首先简单的介绍spark以及spark streaming中累加器和广播变量的使用方式,然后重点介绍一下如何更新广播变量。

累加器

顾名思义,累加器是一种只能通过关联操作进行“加”操作的变量,因此它能够高效的应用于并行操作中。它们能够用来实现counters和sums。Spark原生支持数值类型的累加器,开发者可以自己添加支持的类型,在2.0.0之前的版本中,通过继承AccumulatorParam来实现,而2.0.0之后的版本需要继承AccumulatorV2来实现自定义类型的累加器。
如果创建了一个具名的累加器,它可以在spark的UI中显示。这对于理解运行阶段(running stages)的过程有很重要的作用。如下图:
万码学堂,电脑培训,计算机培训,Java培训,JavaEE开发培训,青岛软件培训,软件工程师培训
在2.0.0之前版本中,累加器的声明使用方式如下:

scala> val accum = sc.accumulator(0, "My Accumulator")accum: spark.Accumulator[Int] = 0scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.valueres2: Int = 10

累加器的声明在2.0.0发生了变化,到2.1.0也有所变化,具体可以参考官方文档,我们这里以2.1.0为例将代码贴一下:

scala> val accum = sc.longAccumulator("My Accumulator")accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc
        
		

网友评论