spark教程(14)-共享变量

摘要:
Spark使用共享架构。数据分布在不同的节点中。每个节点都有自己的CPU和内存。没有全局内存可用于共享变量。例如,如果RDD操作使用驱动程序中的变量,Spark将向执行器发送变量和任务的副本。该变量的更新仅存在于任务内部,不会返回给驱动程序;如果任务分为多个阶段,驱动程序将发送变量

spark 使用的架构是无共享的,数据分布在不同节点,每个节点有独立的 CPU、内存,不存在全局的内存使得变量能够共享,驱动程序和任务之间通过消息共享数据

举例来说,如果一个 RDD 操作使用了驱动程序中的变量,spark 会将这个变量的副本和 task 一起发送给 executor 中的执行者,对该变量的更新只存在于 task 的内部,并不会回传给驱动程序;

如果这个任务分为多个阶段,每个阶段开始时,驱动程序会把 变量 发送给 worker;

在实际场景中,驱动程序在 task 间共享一个巨大(如 100M)的查找表,并且该 task 有多个(如 10个)阶段,spark 会在每个阶段开始时,给每个 task 发送一份数据,那就是 1000M 的传输和存储

显然浪费资源且效率低下;

除此之外,有时我们需要不同节点的多个任务更新一个全局变量,然而这个变量并不回传给驱动程序,又如何同步更新呢?

共享变量

spark 提供了共享变量的概率满足上述需求

spark 提供了两种共享变量:广播变量 和 累加器

spark教程(14)-共享变量第1张

上图描述了共享变量的工作原理

共享变量的作用在于 减少网络传输,减少内存消耗;

其实是一种 spark 编程的优化方法

spark 三大数据结构

RDD:分布式数据集

广播变量:分布式只读共享变量

累加器:分布式只写共享变量

广播变量

spark 只会给 worker 发送一次广播变量(序列化的),并且将它反序列化后存储在 executor 的内存中;

如果任务分为多个阶段,且每个阶段使用相同的变量,那么广播变量就无需每个阶段都传输数据,spark 会将传输过来的数据序列化后存储在 executor 的内存中,在任务开始前反序列化广播变量即可;

 

使用场景

1. 需要共享一个很大的数据集

2. 任务分为多个阶段,每个阶段使用相同的数据

使用方法

SparkContext.broadcast 方法用于创建一个广播变量;

broadcast 输入一个普通变量,返回一个 Broadcast 实例;

调用 Broadcast 实例的 value 属性可以获取变量值;

 

实例分析

import time
from pyspark import SparkContext
factor1 = ['sssssssssssssssssss']*100000       # 待广播的变量
factor2 = range(100000)

if __name__ == '__main__':
    time.clock()
    sc = SparkContext()

    listRdd = sc.parallelize(factor2)

    ### 正确利用广播变量          # 测试:1.40s
    brodacastvalue = sc.broadcast(factor1)
    out = listRdd.map(lambda s: brodacastvalue.value[s-1]).collect()
    
    ### 不正确利用广播变量          # 测试:1.59s
    brodacastvalue = sc.broadcast(factor1).value        
    out = listRdd.map(lambda s: brodacastvalue[s-1]).collect()  # 这里注意了,并没有调用 广播变量,而是调用了它的 value,相当于直接调用了原来的数据

    ### 不利用广播变量         # 测试:1.59s
    out = listRdd.map(lambda s: factor1[s-1]).collect()

    print out
    print(time.clock())

可以看到广播变量优化了效率,特别是替代 join 操作 

累加器

累加器使得多个任务能够操作同一个全局变量, 并且能够回传给驱动程序;

它常用于计数、求和和聚合操作;

spark 支持数值型的累加,也支持自定义类型的累加;

使用方法

SparkContext.accumulator 创建一个累加器变量;

它接受两个参数:累加器的初值,累加器的名字,在 spark UI 中可看到,第二个参数可选

在 task 中只能使用累加器变量的 add 属性或者 += 来更新该变量;

在 驱动程序 中才能调用累加器变量的 value 属性,在 task 中不可以;

实例

from pyspark import SparkContext

sc = SparkContext(appName='leijiaqi')
accum = sc.accumulator(10)      # 初值,名字
# sc.parallelize([1, 2, 3, 4]).foreach(lambda x : accum.add(x))     # 调用 add 属性更新变量

def myadd(x):       
    global accum
    accum+=x
    return accum
sc.parallelize([1, 2, 3, 4]).foreach(myadd)     # 使用 += 方法更新变量
print(accum.value)  # 20  = 10+1+2+3+4

# sc.parallelize([1, 2, 3, 4]).foreach(lambda x : accum.value)        # 在 task 中调用 value 报错:Exception: Accumulator.value cannot be accessed inside tasks

总结

累加器和广播变量使用非常复杂,本文仅介绍基本用法,二者结合使用可以满足很多复杂场景

参考资料:

https://blog.csdn.net/chongxin1/article/details/78048134

https://www.jianshu.com/p/687db128ff2f  python 版

免责声明:文章转载自《spark教程(14)-共享变量》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇bzoj3110 [Zjoi2013]K大数查询Android内核模块编译执行下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

DevExpress Grid使用checkBox选中的方法

到官网得到消息自13.2版本后的Dev Grid中均内置了CheckBox列多选功能。在寻找答案的过程的成果进行记录。 一、13.2版本以后用法 启用多选列对Gird中的View进行以下属性设置:gridView1.OptionsSelection.MultiSelect = true; gridView1.OptionsSelection.Mult...

ASP.Net Core 中使用Zookeeper搭建分布式环境中的配置中心系列一:使用Zookeeper.Net组件演示基本的操作

前言:马上要过年了,祝大家新年快乐!在过年回家前分享一篇关于Zookeeper的文章,我们都知道现在微服务盛行,大数据、分布式系统中经常会使用到Zookeeper,它是微服务、分布式系统中必不可少的分布式协调框架。它的作用体现在分布式系统中解决了配置中心的问题,以及解决了在分布式环境中不同进程之间争夺资源的问题,也就是分布式锁的功能以及分布式消息队列功能等...

tomacat7.0配置(windows)

windows 版Tomcat 7.0的配置 一、安装JDK 1.7 1、添加环境变量:在 我的电脑->属性->高级->环境变量 2、新建系统变量,变量名:JAVA_HOME 变量值:C:\Program Files\Java\jdk1.7.0 (JDK的安装目录) 3、在原有的系统变量 Path后面加上英文分号,再添加%JAVA_HOM...

android极光杀掉程序收不到通知

http://docs.jpush.io/guideline/faq/#android 第三方系统收不到推送的消息 由于第三方 ROM 的管理软件需要用户手动操作 小米【MIUI】 自启动管理:需要把应用加到【自启动管理】列表,否则杀进程或重新开机后进程无法开启 通知栏设置:应用默认都是显示通知栏通知,如果关闭,则收到通知也不会提示 网络助手:可以...

Go操作kafka

目录 一、sarama 1.1 下载及安装 1.2 注意事项 二、连接kafka发送消息 三、连接kafka消费消息 更新、更全的《Go从入门到放弃》的更新网站,更有python、go、人工智能教学等着你:https://www.cnblogs.com/nickchen121/p/11517502.htmlKafka是一种高吞吐量的分布式发布...

Robot Framework操作

Robot Framework 介绍 RobotFramework是一款基于python的开源自动化测试框架,遵守Apache License 2.0协议,在此协议下所有人都可以免费开发和使用。因为Robot Framework 是灵活和可扩展的,所以它很合适用于测试具有多种接口的复杂软件:用户接口,命令行,web service,编程接口等。RF提供很多...