[Spark]-Streaming-调优

摘要:
1.概述SparkStreaming的主要应用方向是实时计算。这意味着SparkStreaming应用程序必须对执行性能和操作稳定性有一定要求(7x24)。2.在性能方面,主要是合理利用集群资源,设置正确的批大小(提高并行度)并减少每个批的处理时间(优化计算逻辑),以使数据流能够尽快被处理。2.1调整Spark配置参数2.2优化翼梁的数据接收

1.概述

  Spark Streaming的主要应用方向是实时计算.这代表一个Spark Streaming应用必然是对执行性能和运行稳定性(7 x 24)有一定要求的

2.性能

  在性能方面,主要是合理的利用的集群资源,设置正确的批处理大小(提升并行度)和减少每个批次的处理时间(计算逻辑优化).以让数据流处理的能像接受一样快

  2.1 调整Spark配置参数

    

  2.2 数据接收优化

    一个Spark Streaming 应用的开端便是数据接收,那么性能调优的第一步就是保证:数据不会在数据接收器端产生积压.

    2.2.2 及时的处理数据以防止数据积压

      如果批处理时间(batch processing time)超过批次间隔(batchinterval), 那么显然数据会不断的积压(receiver 的内存将会开始填满) , 最终会抛出 exceptions(最可能是 BlockNotFoundException).

      考虑提升并行度等方法,以加快批次处理速度.

      适度加大批次间隔,如果这样能及时在下个批次触发之前将数据处理完

           使用 SparkConf 配置 spark.streaming.receiver.maxRate,可以限制receiver的接受速率

    2.2.1 提升并行度  

      2.2.1.1 设置良好的执行任务并行数

        Spark Streaming 依然保持一个分区(partition)一个任务(task)的原则

        设置块间隔以影响任务数

          对于大多数接收器而言,接收到的数据在Spark都是合并在一起的,这称之为数据块(blocks of data).并依据块间隔对数据块分割分区(partition)

          所以对每个批次而言,批次中的块间隔决定了执行Map系的转换操作的任务数.(每个批次的任务数=批间隔(batch interval) / 块间隔(block interval))

            例如:批处理间隔2秒的Steaming应用,200毫秒的块间隔大约会创建10个任务(10=2000/200)

          如果执行的任务数太少(少于机器的可用核数),它将会是低效的.因为有空闲的核没有参与执行.

          如果执行的任务数太多,也是低效的.因为会产生大量的任务排队与任务启动开销.

          所以调整块间隔,以优化出一个合适的执行任务数是非常有必要的.

            配置项 : spark.streaming.blockInterval (默认200(毫秒)). (官方推荐的 block interval (块间隔)最小值约为 50ms , 低于此任务启动开销可能是一个问题)

        直接干预分区(partition)数以影响任务数

          通过调用 inputDstream.repartition(n) 来重新分区也可以.但这会产生一个Shuffle操作.      

      2.2.1.2 并行化数据接收

        每个 input DStream 都会创建一个接收数据流(single stream of data)和一个接收器(single receiver)

        因此, 可以通过创建多个 input DStreams 来实现接收多个数据流(Receiving multiple data streams) ,并配置用数据流的不同分区去接收数据源的不同数据段

        例如, 接收两个数据主题(two topics of data)的单个Kafka input DStream 可以分为两个输入流( Kafka input streams) ,每个只接收一个主题(kafka topic) topic

            这将运行两个接收器(receivers) 以并行的接受数据,从而提高总体吞吐量(overall throughput

            而这多个Input DStreams 可以连接在一起(Union),然后一起进行转换计算等处理.(这种Union不会影响分区(partition)数)

        如下:          

                    val numStreams = 5
                    val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
                    val unifiedStream = streamingContext.union(kafkaStreams)
                    unifiedStream.print()

         注意: 并行数据接受,必须保证有足够多的核可以使用 (spark.cores.max)

  2.3 序列化的影响

    在一个Spark Streaming,在很多时候都会将数据序列化与反序列化,比如:

      i).输入数据

         默认会将接收器接受到的输入数据,通过 StorageLevel.MEMORY_AND_DISK_SER_2 存储到executor的内存或磁盘中.

         这个存储过程就会将输入数据序列化存入.此外,因为是序列化存入,则读取时也必要有一个反序列化过程.

      ii).缓存(persist)一个 DSream 

       流式计算过程中的某一个DSream,可能会需要缓存(临时存储在内存).而在流式计算,缓存模式使用StorageLevel.MEMORY_ONLY_SER,也是序列化操作. (Spark Core是StorageLevel.MEMORY_ONLY)

    总之,在一个Spark Streaming中,使用数据序列化的场合是非常多的.而在这时,序列化算法本身的性能就变得非常关键了.

    Spark 内置提供了 Kryo 序列化.相比Java内置的序列化可以提升一个数量级(10倍)以上. Kryo 唯一的要求时需要提前注册需序列化的类

      使用Kryo序列化:  conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))  详见Spark调优篇之序列化调优

   2.4 内存调优

    关于 Spark 内存调优的已经在Spark调优篇之内存调优讨论过,这里再针对Spark Streaming 应用进行一些讨论.

    2.4.1 对 Spark Streaming 应用,保证有足够的内存使用

      一个Spark Streaming 应用中所需要的内存,很大程序取决于转换(transformations)类型.

        例如:使用一个最近10分钟的窗口操作,那么集群中应该有足够的内存来容纳这10分钟的数据.

          或者要使用大量keys的UpdateStateByKey.则必然会使用大量内存.

          相反,简单的 map-filter-store 操作,则所需内存就会很低

      默认情况,Streaming会使用MEMORY_AND_DISK_SER_2.所以内存溢出的数据会写到磁盘中.这必然会降低应用的性能.

      因此,官方建议对Streaming应用一定要保证有足够的内存使用

     2.4.2 Spark Streaming 的GC

      对 Spark Streaming 内存调优的另一个方向是GC.对于需要低延迟的Spark Streaming 应用 , 肯定是不希望由 JVM GC 引起大量暂停的

      Spark Streaming 一些调整内存使用量和GC 的思路

      i).DStreams 的持久化

          如前所述,DSteam的持久化默认策略是 MEMORY_AND_DISK_SER_2 .序列化存储,已经极大的降低内存使用量和GC的次数.

          进一步,如果使用更好的序列化算法(例如Kryo),获得更快的序列化速度和更小的内存占用,也会进一步提升应用性能.

          再进一步,对序列化结果进行压缩,可以进一步缩小内存占用.(参见Spark配置 spark.rdd.compress)

          当然.这些优化都会耗用更多的CPU资源.

      ii).旧数据清理(Clearing old data)

        DSteam经过转换生成所有的 InputData 和 Persist RDD,将由 Spark 内部决定何时进行清除.

          例如 一个10分钟的窗口操作,Spark默认只会保留最近10分钟的数据,而更早的数据将会被自动清理.

        可以通过设置 streamingContext.remember 保持更长的持续时间(例如交互式查询旧数据) 

      iii).CMS垃圾收集器

          虽然已知 concurrent GC 可以提升系统的整体处理吞吐量,但仍然建议使用 concurrent mark-and-sweep(CMS) GC.

          强烈建议 在Driver 和 Executor 都设置 CMS GC 并设置统一的批处理时间, 以保持 GC 相关的暂停始终如一.

      iv).其它

        使用 OFF_HEAP 存储级别的持久化 RDDs

        使用更小的 heap sizes 的 executors.这将降低每个 JVM heap 内的 GC 压力

免责声明:文章转载自《[Spark]-Streaming-调优》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇BLDC 无刷电机FOC驱动 STM32官方培训资料spring/spring boot/spring mvc中用到的注解下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Redis系统学习之自定义RedisTemplate

自定义RedisTemplate 序列化源码分析 在JAVA程序中看到中文是没有问题的,但是在Redis客户端工具,也就是命令行中看见是编码的 继续分析源码 查看RedisTemplate.class 在RedisAutoConfiguration.class中点击 在上面可以看到序列化支持的 往下稍微滑动一些可以看到,默认采用的是JDK的序列化,...

Hadoop学习笔记—16.Pig框架学习

Hadoop学习笔记—16.Pig框架学习 一、关于Pig:别以为猪不能干活 1.1 Pig的简介 Pig是一个基于Hadoop的大规模数据分析平台,它提供的SQL-LIKE语言叫Pig Latin,该语言的编译器会把类SQL的数据分析请求转换为一系列经过优化处理的MapReduce运算。Pig为复杂的海量数据并行计算提供了一个简单的操作和编程接口。 C...

入门系列-缓存机制介绍与使用

ABP框架扩展了ASP.NET Core的分布式缓存系统. Volo.Abp.Caching Package 默认情况下启动模板已经安装了这个包,所以大部分情况下你不需要手动安装. Volo.Abp.Caching是缓存系统的核心包.使用包管理控制台(PMC)安装到项目: Install-Package Volo.Abp.Caching 然后将AbpCa...

5.5Python数据处理篇之Sympy系列(五)---解方程

目录 目录 目录 前言 (一)求解多元一次方程-solve() 1.说明: 2.源代码: 3.输出: (二)解线性方程组-linsolve() 1.说明: 2.源代码: 3.输出: (三)解非线性方程组-nonlinsolve() 1.说明: 2.源代码: 3.输出: (四)求解微分方程-dsolve() 1.说明: 2.源代码:...

rest-framework框架——视图三部曲

一、mixins类编写视图 1、配置url urlpatterns = [ ... re_path(r'^authors/$', views.AuthorView.as_view(), name="author"), re_path(r'^authors/(?P<pk>d+)/$', views.AuthorDet...

Spark官方调优文档翻译(转载)

Spark调优由于大部分Spark计算都是在内存中完成的,所以Spark程序的瓶颈可能由集群中任意一种资源导致,如:CPU、网络带宽、或者内存等。最常见的情况是,数据能装进内存,而瓶颈是网络带宽;当然,有时候我们也需要做一些优化调整来减少内存占用,例如将RDD以序列化格式保存(storing RDDs in serialized form)。本文将主要涵盖...