Spark 算子调优:MapPartitions+coalesce+foreachPartition+repartition+reduceByKey详解

摘要:
然而,在使用MapPartitions操作之后,一个任务将只执行一次函数,并且该函数将一次接收所有分区数据。例如,原来的四个分区现在可以更改为两个分区。合并运算符主要用于在过滤操作后每个分区的数据量不同时压缩分区的数量。减少分区数量,并使每个分区的数据量尽可能均匀和紧凑。一组数据的比较:在生产环境中,一个分区大约是1000个foreach。使用foreachPartition,性能可提高2-3分钟。

一.算子调优之MapPartitions提升Map类操作性能

1.MapPartitions操作的优点:

如果是普通的map,比如一个partition中有1万条数据;ok,那么你的function要执行和计算1万次。

Spark 算子调优:MapPartitions+coalesce+foreachPartition+repartition+reduceByKey详解第1张

但是,使用MapPartitions操作之后,一个task仅仅会执行一次function,function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。

Spark 算子调优:MapPartitions+coalesce+foreachPartition+repartition+reduceByKey详解第2张

2.MapPartitions的缺点:一定是有的。

如果是普通的map操作,一次function的执行就处理一条数据;那么如果内存不够用的情况下,比如处理了1千条数据了,那么这个时候内存不够了,那么就可以将已经处理完的1千条数据从内存里面垃圾回收掉,或者用其他方法,腾出空间来吧。

所以说普通的map操作通常不会导致内存的OOM异常。

但是MapPartitions操作,对于大量数据来说,比如甚至一个partition,100万数据,一次传入一个function以后,那么可能一下子内存不够,但是又没有办法去腾出内存空间来,可能就OOM,内存溢出。

3.什么时候比较适合用MapPartitions系列操作?

就是说,数据量不是特别大的时候,都可以用这种MapPartitions系列操作,性能还是非常不错的,是有提升的。比如原来是15分钟,(曾经有一次性能调优),12分钟。10分钟->9分钟。

但是也有过出问题的经验,MapPartitions只要一用,直接OOM,内存溢出,崩溃。

在项目中,自己先去估算一下RDD的数据量,以及每个partition的量,还有自己分配给每个executor的内存资源。看看一下子内存容纳所有的partition数据,行不行。如果行,可以试一下,能跑通就好。性能肯定是有提升的。

但是试了一下以后,发现,不行,OOM了,那就放弃吧。

二.算子调优之filter过后使用coalesce减少分区数量

(1)在经过filter之后,通常会造成各个partition中的数据数量相差过大

Spark 算子调优:MapPartitions+coalesce+foreachPartition+repartition+reduceByKey详解第3张

默认情况下,经过了这种filter之后,RDD中的每个partition的数据量,可能都不太一样了。(原本每个partition的数据量可能是差不多的)

问题:

1、每个partition数据量变少了,但是在后面进行处理的时候,还是要跟partition数量一样数量的task,来进行处理;有点浪费task计算资源。

2、每个partition的数据量不一样,会导致后面的每个task处理每个partition的时候,每个task要处理的数据量就不同,这个时候很容易发生什么问题?数据倾斜。。。。

比如说,第二个partition的数据量才100;但是第三个partition的数据量是900;那么在后面的task处理逻辑一样的情况下,不同的task要处理的数据量可能差别达到了9倍,甚至10倍以上;同样也就导致了速度的差别在9倍,甚至10倍以上。

这样的话呢,就会导致有些task运行的速度很快;有些task运行的速度很慢。这,就是数据倾斜。

(2)针对上述的两个问题,我们希望应该能够怎么样?

1、针对第一个问题,我们希望可以进行partition的压缩吧,因为数据量变少了,那么partition其实也完全可以对应的变少。比如原来是4个partition,现在完全可以变成2个partition。那么就只要用后面的2个task来处理即可。就不会造成task计算资源的浪费。(不必要,针对只有一点点数据的partition,还去启动一个task来计算)

2、针对第二个问题,其实解决方案跟第一个问题是一样的;也是去压缩partition,尽量让每个partition的数据量差不多。那么这样的话,后面的task分配到的partition的数据量也就差不多。不会造成有的task运行速度特别慢,有的task运行速度特别快。避免了数据倾斜的问题。

有了解决问题的思路之后,接下来,我们该怎么来做呢?实现?

(3)coalesce算子

主要就是用于在filter操作之后,针对每个partition的数据量各不相同的情况,来压缩partition的数量。减少partition的数量,而且让每个partition的数据量都尽量均匀紧凑。

从而便于后面的task进行计算操作,在某种程度上,能够一定程度的提升性能。

三.算子调优之使用foreachPartition优化写数据库性能

(1)传统的foreach写数据库过程

Spark 算子调优:MapPartitions+coalesce+foreachPartition+repartition+reduceByKey详解第4张

默认的foreach的性能缺陷在哪里?

首先,对于每条数据,都要单独去调用一次function,task为每个数据,都要去执行一次function函数。

如果100万条数据,(一个partition),调用100万次。性能比较差。

另外一个非常非常重要的一点

如果每个数据,你都去创建一个数据库连接的话,那么你就得创建100万次数据库连接。

但是要注意的是,数据库连接的创建和销毁,都是非常非常消耗性能的。虽然我们之前已经用了数据库连接池,只是创建了固定数量的数据库连接。

你还是得多次通过数据库连接,往数据库(MySQL)发送一条SQL语句,然后MySQL需要去执行这条SQL语句。如果有100万条数据,那么就是100万次发送SQL语句。

以上两点(数据库连接,多次发送SQL语句),都是非常消耗性能的。

(2)使用foreachPartition

Spark 算子调优:MapPartitions+coalesce+foreachPartition+repartition+reduceByKey详解第5张

(3)用了foreachPartition算子之后,好处在哪里?

1、对于我们写的function函数,就调用一次,一次传入一个partition所有的数据

2、主要创建或者获取一个数据库连接就可以

3、只要向数据库发送一次SQL语句和多组参数即可


在实际生产环境中,清一色,都是使用foreachPartition操作;但是有个问题,跟mapPartitions操作一样,如果一个partition的数量真的特别特别大,比如真的是100万,那基本上就不太靠谱了。

一下子进来,很有可能会发生OOM,内存溢出的问题。

一组数据的对比:生产环境

一个partition大概是1千条左右
用foreach,跟用foreachPartition,性能的提升达到了2~3分钟。

四.算子调优之使用repartition解决Spark SQL低并行度的性能问题

并行度:之前说过,并行度是自己可以调节,或者说是设置的。

1、spark.default.parallelism
2、textFile(),传入第二个参数,指定partition数量(比较少用)

咱们的项目代码中,没有设置并行度,实际上,在生产环境中,是最好自己设置一下的。官网有推荐的设置方式,你的spark-submit脚本中,会指定你的application总共要启动多少个executor,100个;每个executor多少个cpu core,2~3个;总共application,有cpu core,200个。

官方推荐,根据你的application的总cpu core数量(在spark-submit中可以指定,200个),自己手动设置spark.default.parallelism参数,指定为cpu core总数的2~3倍。400~600个并行度。600。

承上启下

你设置的这个并行度,在哪些情况下会生效?哪些情况下,不会生效?
如果你压根儿没有使用Spark SQL(DataFrame),那么你整个spark application默认所有stage的并行度都是你设置的那个参数。(除非你使用coalesce算子缩减过partition数量)

问题来了,Spark SQL,用了。用Spark SQL的那个stage的并行度,你没法自己指定。Spark SQL自己会默认根据hive表对应的hdfs文件的block,自动设置Spark SQL查询所在的那个stage的并行度。你自己通过spark.default.parallelism参数指定的并行度,只会在没有Spark SQL的stage中生效。

比如你第一个stage,用了Spark SQL从hive表中查询出了一些数据,然后做了一些transformation操作,接着做了一个shuffle操作(groupByKey);下一个stage,在shuffle操作之后,做了一些transformation操作。hive表,对应了一个hdfs文件,有20个block;你自己设置了spark.default.parallelism参数为100。

你的第一个stage的并行度,是不受你的控制的,就只有20个task;第二个stage,才会变成你自己设置的那个并行度,100。

问题在哪里?

Spark SQL默认情况下,它的那个并行度,咱们没法设置。可能导致的问题,也许没什么问题,也许很有问题。Spark SQL所在的那个stage中,后面的那些transformation操作,可能会有非常复杂的业务逻辑,甚至说复杂的算法。如果你的Spark SQL默认把task数量设置的很少,20个,然后每个task要处理为数不少的数据量,然后还要执行特别复杂的算法。

Spark 算子调优:MapPartitions+coalesce+foreachPartition+repartition+reduceByKey详解第6张

这个时候,就会导致第一个stage的速度,特别慢。第二个stage,1000个task,刷刷刷,非常快。

解决上述Spark SQL无法设置并行度和task数量的办法,是什么呢?

repartition算子,你用Spark SQL这一步的并行度和task数量,肯定是没有办法去改变了。但是呢,可以将你用Spark SQL查询出来的RDD,使用repartition算子,去重新进行分区,此时可以分区成多个partition,比如从20个partition,分区成100个。

然后呢,从repartition以后的RDD,再往后,并行度和task数量,就会按照你预期的来了。就可以避免跟Spark SQL绑定在一个stage中的算子,只能使用少量的task去处理大量数据以及复杂的算法逻辑。

 Spark 算子调优:MapPartitions+coalesce+foreachPartition+repartition+reduceByKey详解第7张

五.算子调优之reduceByKey本地聚合介绍

reduceByKey,相较于普通的shuffle操作(比如groupByKey),它的一个特点,就是说,会进行map端的本地聚合。

对map端给下个stage每个task创建的输出文件中,写数据之前,就会进行本地的combiner操作,也就是说对每一个key,对应的values,都会执行你的算子函数() + _)

(1)用reduceByKey对性能的提升:

1、在本地进行聚合以后,在map端的数据量就变少了,减少磁盘IO。而且可以减少磁盘空间的占用。

2、下一个stage,拉取数据的量,也就变少了。减少网络的数据传输的性能消耗。

3、在reduce端进行数据缓存的内存占用变少了。

4、reduce端,要进行聚合的数据量也变少了。

(2)总结:

reduceByKey在什么情况下使用呢?

1、非常普通的,比如说,就是要实现类似于wordcount程序一样的,对每个key对应的值,进行某种数据公式或者算法的计算(累加、类乘)

2、对于一些类似于要对每个key进行一些字符串拼接的这种较为复杂的操作,可以自己衡量一下,其实有时,也是可以使用reduceByKey来实现的。但是不太好实现。如果真能够实现出来,对性能绝对是有帮助的。(shuffle基本上就占了整个spark作业的90%以上的性能消耗,主要能对shuffle进行一定的调优,都是有价值的)

Spark 算子调优:MapPartitions+coalesce+foreachPartition+repartition+reduceByKey详解第8张

免责声明:文章转载自《Spark 算子调优:MapPartitions+coalesce+foreachPartition+repartition+reduceByKey详解》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇WebUi环境安装Zynq 7020笔记之 GPIO MIO 和EMIO的学习下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Java 8 (10) CompletableFuture:组合式异步编程

https://www.cnblogs.com/baidawei/p/9447737.html   随着多核处理器的出现,提升应用程序的处理速度最有效的方式就是可以编写出发挥多核能力的软件,我们已经可以通过切分大型的任务,让每个子任务并行运行,使用线程的方式,分支/合并框架(java 7) 和并行流(java 8)来实现。 现在很多大型的互联网公司都对外...

Spark History Server配置使用

Spark history Server产生背景 以standalone运行模式为例,在运行Spark Application的时候,Spark会提供一个WEBUI列出应用程序的运行时信息;但该WEBUI随着Application的完成(成功/失败)而关闭,也就是说,Spark Application运行完(成功/失败)后,将无法查看Application...

Hadoop入门知识总结

一、大数据1.含义 大数据指在一定时间范围内使用常规的软件无法处理的数据集合!2.特点 ①海量 ②高增长率 ③多样性 ④低价值密度二、Hadoop1.含义 狭义: Hadoop只代表hadoop框架本身! 广义: hadoop代表整个hadoop体系,由hadoop框架和其他依赖于hadoop的其他框架共同组成!2.hadoop的组成2.x版本 HDFS:...

Asp.net 面向接口可扩展框架之数据处理模块及EntityFramework扩展和Dapper扩展(含干货)

接口数据处理模块是什么意思呢?实际上很简单,就是使用面向接口的思想和方式来做数据处理。 还提到EntityFramework和Dapper,EntityFramework和Dapper是.net环境下推崇最高的两种ORM工具。 1、EntityFramework是出自微软根正苗红的.net下的ORM工具,直接在Vs工具和Mvc框架中集成了,默认生成的项目就...

源码分析Kafka 消息拉取流程

本节重点讨论 Kafka 的消息拉起流程。 @ 目录 1、KafkaConsumer poll 详解 1.1 KafkaConsumer updateAssignmentMetadataIfNeeded 详解 1.1.1 ConsumerCoordinator#poll 1.1.2 updateFetchPositions 详解 1.2 消息拉...

《特征工程三部曲》之一:数据处理

要理解特征工程,首先要理解数据(Data)和特征(Feature)的概念 概念 特征工程(Feature Engineering) 其本质上是一项工程活动,它目的是最大限度地从原始数据中提取特征以供算法和模型使用。 特征工程在数据挖掘中有举足轻重的位置 数据领域一致认为:数据和特征决定了机器学习的上限,而模型和算法只能逼近这个上限而已。 特征工程...