spark streaming之三 rdd,job的动态生成以及动态调度

摘要:
上一篇文章讨论了DAG静态模板的生成。每次批处理时间一到,Sparkstream就会根据DAG形成的逻辑和物理依赖链动态生成RDD和由这些RDD组成的作业,并形成一个作业集合,提交给集群执行。然后让我们详细分析这三个步骤。JobExecutor是一个名为流式作业执行器的线程池,JobHandler是从Runnable=Null){_eventLoop.post}}继承的线程类,否则{//JobSchedulerhasbeenstoped.}}}最后{ssc.sparkContext.setLocalProperties}}}}ViewCode要了解的其他两个对象是jobGenerator和receiverTracker。jobGenerator负责动态生成作业,receiverTracker负责接收数据源和随后的转换,以及基于这些转换形成DAG模板。

前面一篇讲到了,DAG静态模板的生成。那么spark streaming会在每一个batch时间一到,就会根据DAG所形成的逻辑以及物理依赖链(dependencies)动态生成RDD以及由这些RDD组成的job,并形成一个job集合提交到集群当中执行。那么下面我们具体分析这三个步骤。

首先从JobScheduler讲起。在本节所需要了解的是JobScheduler的两个重要对象。jobExecutor与JobHandler。jobExecutor是一个名为streaming-job-executor的线程池,JobHandler是一个继承自Runnable的线程类。提交过来的JOB将提交到到这里执行。

private val jobExecutor =
    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
spark streaming之三 rdd,job的动态生成以及动态调度第1张spark streaming之三 rdd,job的动态生成以及动态调度第2张
 private class JobHandler(job: Job) extends Runnable with Logging {
    import JobScheduler._

    def run() {
      val oldProps = ssc.sparkContext.getLocalProperties
      try {
        ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
        val formattedTime = UIUtils.formatBatchTime(
          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

        ssc.sc.setJobDescription(
          s"""Streaming job from <a href="http://t.zoukankan.com/$batchUrl">$batchLinkText</a>""")
        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
        // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
        // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
        ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")

        // We need to assign `eventLoop` to a temp variable. Otherwise, because
        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
        // it's possible that when `post` is called, `eventLoop` happens to null.
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sparkContext.setLocalProperties(oldProps)
      }
    }
  }
}
View Code

另外两个需要了解的对象,jobGenerator以及receiverTracker。jobGenerator负责job的动态生成,receiverTracker负责数据源的接收以及接收以后的transformation,以及根据这些转换形成DAG模板。随着JobScheduler启动的时候,jobGenerator以及receiverTracker也将启动。

spark streaming之三 rdd,job的动态生成以及动态调度第3张spark streaming之三 rdd,job的动态生成以及动态调度第4张
 def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()

    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)

    listenerBus.start()
    receiverTracker = new ReceiverTracker(ssc)
    inputInfoTracker = new InputInfoTracker(ssc)

    val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
      case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
      case _ => null
    }

    executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
      executorAllocClient,
      receiverTracker,
      ssc.conf,
      ssc.graph.batchDuration.milliseconds,
      clock)
    executorAllocationManager.foreach(ssc.addStreamingListener)
    receiverTracker.start()//
    jobGenerator.start()//
    executorAllocationManager.foreach(_.start())
    logInfo("Started JobScheduler")
  }
View Code

spark streaming之三 rdd,job的动态生成以及动态调度第5张

JobGenerator 启动

本节先按下 receiverTracker不表。先说jobGenerator。我们首先来看看它的start函数。首先启动一个待命的线程。然后根据上次的spark streaming任务是否执行了checkpoint来决定是执行restart()还是startFirstTime()。

/** Start generation of jobs */
  def start(): Unit = synchronized {
    ......
    eventLoop.start()

    if (ssc.isCheckpointPresent) {
      restart()
    } else {
      startFirstTime()
    }
  }

因为只为弄清流程原理,我们只看第一次启动的情况。

 /** Starts the generator for the first time */
  private def startFirstTime() {
    val startTime = new Time(timer.getStartTime())
    graph.start(startTime - graph.batchDuration)
    timer.start(startTime.milliseconds)
    logInfo("Started JobGenerator at " + startTime)
  }

首先graph.start(startTime - graph.batchDuration)传递一个时间参数给DStreamGraph,告知其batch启动时间,并初始化相关参数。然后启动定时器。

RecurringTimer

代码如下

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

其实就是往eventLoop线程里加入GenerateJobs。即定时根据DAG模板生成当前batch的DAG实例。注意这里是jobGenerator的GenerateJobs

jobGenerator之GenerateJobs

/** Generate jobs and perform checkpointing for the given `time`.  */
private def generateJobs(time: Time) {
  SparkEnv.set(ssc.env)
  Try {
    jobScheduler.receiverTracker.allocateBlocksToBatch(time)                 
    graph.generateJobs(time)                                                
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) 
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))    
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))       
}

它做了五件事情。

首先,通知jobScheduler获取当前batch需要处理的数据。

然后调用DStreamGraph的GenerateJobs函数真正去执行操作。

第三,将第一步获取到的数据保存在inputInfoTracker中。这部份被称之为元数据。什么叫元数据,就是最开始没有经过各种转换的数据。

第四,将生成的RDD实例以及元数据一同提交给jobScheduler。这部份是提交到jobExecutor这个线程池里异步执行的。

再然后后将一个checkpoint任务异步交给eventLoop去执行。

InputinfoTracker

可能有人会发现上面的表述中,第一和第三步有点雷同。但其实它们是不一样的。

我们追踪jobScheduler.receiverTracker.allocateBlocksToBatch(time)最终得到的数据结构为:

private[streaming] case class ReceivedBlockInfo(
streamId: Int,
numRecords: Option[Long],
metadataOption: Option[Any],
blockStoreResult: ReceivedBlockStoreResult
)

jobScheduler.inputInfoTracker.getInfo(time)最终得到的数据结构为:

case class StreamInputInfo(
    inputStreamId: Int, numRecords: Long, metadata: Map[String, Any] = Map.empty) {
  require(numRecords >= 0, "numRecords must not be negative")

  def metadataDescription: Option[String] =
    metadata.get(StreamInputInfo.METADATA_KEY_DESCRIPTION).map(_.toString)
}

对比发现,两种数据结构都有streamId(inputStreamId),numRecords,metadataOption,区别在于blockStoreResult。而且第三步获取到的元数据有句注释:Map to track all the InputInfo related to specific batch time and input stream。意为跟踪所有输入流信息和处理记录号的跟踪器。

// Map to track all the InputInfo related to specific batch time and input stream.
private val batchTimeToInputInfos =
new mutable.HashMap[Time, mutable.HashMap[Int, StreamInputInfo]]

InputInfoTracker的注释是This class manages all the input streams as well as their input data statistics. The information will be exposed through StreamingListener for monitoring.还有,StreamInputInfo注解为@DeveloperAPI

我的理解是第一步所做的是获取到数据。第三步所做的是给这些数据加上属性,提供给开发者查询展示监控。

到目前为止,我们分析了动态调度的流程,整个流程如图:

spark streaming之三 rdd,job的动态生成以及动态调度第6张

DAG实例生成

 上面仅仅是分析了动态调度的问题,而DAG实例究竟是怎样生成的?

spark streaming之三 rdd,job的动态生成以及动态调度第7张

总结

spark streaming之三 rdd,job的动态生成以及动态调度第8张

到这里为止,我们根据DAG模板拿到了DAG实例,以及数据。那么接下来,会根据DAGScheduler划分task,stage。

免责声明:文章转载自《spark streaming之三 rdd,job的动态生成以及动态调度》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇如何在CentOS 6.4上安装并使用OpenVZ?Mybatis框架基础支持层——反射工具箱之实体属性Property工具集(6)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

spark作业性能调优

spark作业性能调优 优化的目标 保证大数据量下任务运行成功 降低资源消耗 提高计算性能 一、开发调优: (1)避免创建重复的RDD RDD lineage,也就是“RDD的血缘关系链” 开发RDD lineage极其冗长的Spark作业时,创建多个代表相同数据的RDD,进而增加了作业的性能开销。 (2)尽可能复用同一个RDD 比如说,有一个RDD的...

Spark SQL概述

前言:一些逻辑用spark core 来写,会比较麻烦,如果用sql 来表达,那简直太方便了 一、Spark SQL 是什么 是专门处理结构化数据的 Spark 组件 Spark SQL 提供了两种操作数据的方法:   sql 查询   DataFrames/Datasets API Spark SQL = Schema + RDD 二、Spark SQL...

Mastering-Spark-SQL学习笔记02 SparkSession

SparkSession是在使用类型化数据集(或基于非类型化Row-基于DataFrame)数据抽象开发Spark SQL应用程序时创建的首批对象之一。 在Spark 2.0中,SparkSession将SQLContext和HiveContext合并到一个对象中。 使用SparkSession.builder方法来创建一个SparkSession实例,使...

spark基础知识(1)

一、大数据架构   并发计算: 并行计算: 很少会说并发计算,一般都是说并行计算,但是并行计算用的是并发技术。并发更偏向于底层。并发通常指的是单机上的并发运行,通过多线程来实现。而并行计算的范围更广,他是散布到集群上的分布式计算。 Spark内存计算比hadoop快100倍,磁盘计算快10倍,在worker节点主要基于内存进行计算,避免了不必要的磁盘io。...

Spark大型电商项目实战-及其改良(1) 比对sparkSQL和纯RDD实现的结果

代码存在码云:https://coding.net/u/funcfans/p/sparkProject/git 代码主要学习https://blog.csdn.net/u012318074/article/category/6744423/1这里的 发现样例作为正式项目来说效率太低了,为了知识点而知识点.对原代码做了一定优化 第1个项目:用户访问sessi...

大数据--Spark原理

Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一,与Hadoop和Storm等其他大数据和MapReduce技术相比,Spark有如下优势: 1.运行速度快,Spark拥有DAG执行引擎,支持在内存中对数据进行迭代计算。官方提供...