wordCount的执行流程

摘要:
我们可以以rdd.toDebugString的形式读取数据(在这一步之后,rdd Lineage将记录rdd的元数据信息和转换行为,它可以根据这些信息重新计算和恢复丢失的数据分区。内部实施细节1。TextFile将生成两个RDD(值),我们称之为map方法,map的作用是取消键。

  我们对于wordCount的这个流程,在清晰不过了,不过我们在使用spark以及hadoop本身的mapReduce的时候,我们是否理解其中的原理呢,今天我们就来介绍一下wordCount的执行原理,

  1.首先我们都会这样子执行(wordCount执行在hadoop中)

  val rdd = sc.textFile("hdfs://weekday01:9000/wc").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

  rdd.saveAsTextFile("hdfs://weekday01:9000/out")

  如果我们此时想看依赖的关系的话,我们可以这样操作

  rdd.toDebugString(执行完这一步操作之后,你就可以看到你在hadoop中执行wordCount的这个过程,中间到底生成了

  多少个rdd)

  2.rdd与rdd的一些依赖关系

    其实在我们每一次生成rdd的时候,都是由于后面的rdd要依赖前面的rddwordCount的执行流程第1张

  3.Lineage(血统)

  RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage记录下来,以便恢复丢失

  的分区,

  此时rdd不会记录这个血统中的各个rdd的具体的值是多少。RDD的Lineage会记录RDD的元数据信息和转换行为,

  当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区

  4.内部实现细节

    1.textFile会产生两个RDD,1.HadoopRDD,为什么第一个是HadoopRDD,因为我们需要在hadoop里面读取数据,

    读取数据的时候是以(key,value)的形式读取数据,其中的可以是偏移量,而value是一行的数据,

    2.MapPartitionsRDD,以为我们调用了map方法,而这其中的map的作用是把key取消掉了,从而我们把value取出来

    2.xxx.flatMap,则这个产生一个RDD,即MapPartionsRDD,

    3.map((_,1)),这个是读取每一行的数据,然后在对每一行进行操作,然后在生成一个MapPartitionsRDD,

    经过这个RDD之后,这个里面装的都是(key,value)类型的数据

    4.reduceByKey,这个里面new了一个ShuffleRDD,要进行聚合,这个会经历两次聚合,第一聚合是在这个分区里面,

    当聚合完成之后,从上游拉下来,在进行总体的聚合,这就是所谓的先分区,在总体

    5..saveAsTextFile(path:String),因为这个的操作是往hdfs写数据,所以我们需要拿到hdfs的流,不过如果我们用map的话,

    就相当于我的每一条数据我都会拿一个流,这样浪费资源,所以此时的我,使用的是mapPartition(),则此时是拿取一个分区里

    面的数据,我们拿一个流,把这一个分区的数据都写进去

  综上所述,一共产生了6个RDD

/**
  * Created by root on 2016/5/14.
  */
object WordCount {
  def main(args: Array[String]) {
    //非常重要,是通向Spark集群的入口
    val conf = new SparkConf().setAppName("WC")
      .setJars(Array("C:\HelloSpark\target\hello-spark-1.0.jar"))
      .setMaster("spark://node-1.itcast.cn:7077")
    val sc = new SparkContext(conf)

    //textFile会产生两个RDD:HadoopRDD  -> MapPartitinsRDD
    sc.textFile(args(0)).cache()
      // 产生一个RDD :MapPartitinsRDD
      .flatMap(_.split(" "))
      //产生一个RDD MapPartitionsRDD
      .map((_, 1))
      //产生一个RDD ShuffledRDD
      .reduceByKey(_+_)
      //产生一个RDD: mapPartitions
      .saveAsTextFile(args(1))
    sc.stop()
  }
}

  流程图

  wordCount的执行流程第2张

    

免责声明:文章转载自《wordCount的执行流程》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇VS调试问题整理 :无法在Web服务器上启动调试。您不具备调试此应用程序的权限...C语言中位域(bit fields)的可移植问题下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

互联网产品怎么做数据埋点

在互联网产品上线之后,产品和运营人员需要即时了解产品的使用情况,有多少用户,用户使用了哪些功能,停留时长,使用路径。。。等。要回答这些问题,需要有数据,不能拍脑袋想当然。数据怎么得到呢?埋点就是采集数据的重要途径。 数据埋点不是新名词,在电脑网站出来之后就有统计工具,站长们很熟悉的谷歌、百度统计等工具,通过在HTML页面中嵌入它们提供的js代码实现数据采集...

hadoop yarn ui applications list 研究

Yarn提供的web界面可以看到applications历史数据,Yarn Web UI的列表数据如图: 这部分数据的展现方式是前台分页,意思就是比如有2w的历史数据,将一次全部加载,第一次非常慢,后续操作很快。 列表数据最终调用到org.apache.hadoop.yarn.server.resourcemanager.webapp.AppsBlock...

C#使用SqlDataAdapter 实现数据的批量插入和更新

近日由于项目要求在需要实现中型数据的批量插入和更新,晚上无聊,在网上看到看到这样的一个实现方法,特摘抄过来,以便以后可能用到参考。 一.数据的插入 DateTime begin = DateTime.Now; string connectionString = ......; using(SqlConnection conn = new SqlConnec...

HBase之七:事务和并发控制机制原理

作为一款优秀的非内存数据库,HBase和传统数据库一样提供了事务的概念,只是HBase的事务是行级事务,可以保证行级数据的原子性、一致性、隔离性以及持久性,即通常所说的ACID特性。为了实现事务特性,HBase采用了各种并发控制策略,包括各种锁机制、MVCC机制等。本文首先介绍HBase的两种基于锁实现的同步机制,再分别详细介绍行锁的实现以及各种读写锁的应...

8086CPU的结构与功能

CPU结构与功能 不管什么型号的CPU,其内部均有这四大部件 ALU:算术逻辑单元 工作寄存器:分为数据寄存器和地址寄存器 工作寄存器的目的是为了提高运算速度,希望参与运算的数据不从外部存储器去取数据,而是在CPU内部取,所以要有能暂存少量数据的寄存器。 数据寄存器是专门存放数据的,地址寄存器是专门存放地址,进行间接寻址方式,但当地址寄存器不提供地址时...

编译器扩展deprecated

背景 当前的工作与SVN有关,今天使用SVN库编写了一个小程序,编译代码时发现编译器告警:“warning: 'svn_client_ls2' is deprecated (declared at”。svn@linux-rwdx:~/objs/motadou> make g++ webdav.cpp -o webdav -I/home/svn/ap...