spark streaming job生成与运行

摘要:
SparkStreamingjob生成SparkStreaming每次提交作业时将提交多少个作业?

spark streaming job生成

spark Streaming每次提交job的时候,会提交几个呢?

DStreamGraph#
  def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      outputStreams.flatMap { outputStream =>
        val jobOption = outputStream.generateJob(time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }

分别根据每个outputStream生成job,也就是说有多少个outputStream,就会有多少job

outputStream如何生成呢

DStream####
  private def foreachRDD(
      foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  }

这里通过register方法来注册outputStream

DStream#
  /**
   * Register this streaming as an output stream. This would ensure that RDDs of this
   * DStream will be generated.
   */
  private[streaming] def register(): DStream[T] = {
    ssc.graph.addOutputStream(this)
    this
  }

体现在代码级别上,每执行一个foreach方法,提交job的时候就会有新增一个job,如果整个应用中没有foreach,也就是说没有 outputStream的话,会触发异常。

DStreamGraph#
  def validate() {
    this.synchronized {
      require(batchDuration != null, "Batch duration has not been set")
      // assert(batchDuration >= Milliseconds(100), "Batch duration of " + batchDuration +
      // " is very low")
      require(getOutputStreams().size > 0, "No output operations registered, so nothing to execute")
    }
  }

生成job

DStream#
   * Generate a SparkStreaming job for the given time. This is an internal method that
   * should not be called directly. This default implementation creates a job
   * that materializes the corresponding RDD. Subclasses of DStream may override this
   * to generate their own jobs.
   */
  private[streaming] def generateJob(time: Time): Option[Job] = {
    getOrCompute(time) match {
      case Some(rdd) => {
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          context.sparkContext.runJob(rdd, emptyFunc)
        }
        Some(new Job(time, jobFunc))
      }
      case None => None
    }
  }

此处有jobFunc,直接调用的spark的runJob方法,runJob的分析,可以参考我另一篇博客。

job提交

JobGenerator#
  /** Generate jobs and perform checkpoint for the given `time`.  */
  private def generateJobs(time: Time) {
    // Set the SparkEnv in this thread, so that job generation code can access the environment
    // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
    SparkEnv.set(ssc.env)
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      //此处生成job
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        //此处提交job
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

JobScheduler#
  def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }

内部有线程池,提交JobHandler

  private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
  
  private val jobExecutor =
    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
  private class JobHandler(job: Job) extends Runnable with Logging {
    import JobScheduler._

    def run() {
      try {
			....
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
          //此处就是job的最终运行的地方
            job.run()
          }
          ....
    }
  }
Job#
  def run() {
    _result = Try(func())
  }

//此处func方法就是在job生成时的jobFunc,调用的runJob方法。

免责声明:文章转载自《spark streaming job生成与运行》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇WPF 多语言解决方案【踩坑日记】记一次静态导入引起Lombok失效,导致编译失败的惨案下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

随便看看

R8500 MPv2 版本 刷梅林改版固件

R8500很复杂。此外,中国koolshare上有人发布了Merlin的修改和移植固件。主要原因是科学上网更方便,所以R8500被用作Merlin固件。这是我第一次使用Merlin固件。Koolshare的官方文件服务器:http://firmware.koolshare.cn/merlin_8wan_firmware/R8500/X7.4/官方固件刷到Me...

codeforces 765 F Souvenirs 线段树+set

问题的含义:多个查询的间隔中两个数字之差的绝对值的最小值:可以根据查询的l对脱机查询进行排序,并且可以从r到l进行反向查询,并且间隔i+1到n的每次更新都可以确保此更新不会影响下一次和后续更新。因为当两个区间相互覆盖时,具有较小l的区间的值必须小于或等于另一个区间,因此可以绘制一个图来理解。...

Idea常用插件整合

官方网站:https://plugins.jetbrains.com/plugin/228-sql-query-plugin6.IdeaVim基于IntelliJ的Vim仿真插件。注意:如果打开WebInspector,那么CSS/JavaScript同步和元素高亮显示不起作用“pluginisdebuggingthistab”信息栏的可用性问题官方网站:h...

怎么使用vscode合并分支

1.切换分支到本地开发2.代码完成后提交代码到本地仓库3.切换分支到需要merge的test分支先pull一下,之后再合并分支—我开发的是这个feature,就合并这个分支4.当合并分支后,需要重新提交到远程:点击一下,直接提交...

java报表实现excel一样冻结表头的功能

增加了几个新的指标,后台sql改了,拿过来只须在一个dao类中修改就足够了,可恨的是客户又提出来改报表表样,加个类似excel冻结表头的功能。...

Oracle 12c新特性(For DBA)

2: Oracle12cIn-MemoryOracle12cIn-Memory提供了一种独特的双格式体系结构,它可以使用传统的行格式和新的内存列格式同时在内存中存储表。与其他NOSQL分片结构相比,OracleSharding提供了优异的运行时性能和更简单的生命周期管理。OracleSharding使用GDS体系结构自动部署和管理分片和复制技术。GDS还提供...