spark程序设计

摘要:
..)//128MBvalbdata=sc。广播//广播128MBvalrdd=sc。并行评估观察到的大小=rdd。map(_=˃bdata.value.size

Spark程序设计—创建RDD:从Scala集合构造成RDD

parallelize(a, 3)

makeRDD(a, 3)

他俩使用方式一样,只不过名字不一样

Spark程序设计—创建RDD:本地文件/HDFS

textFile(path, 100)

sequenceFile

wholeTextFiles

举例:

1. 文本文件(TextInputFormat)

  sc.textFile(“file.txt”) //将本地文本文件加载成RDD

  sc.textFile(“directory/*.txt”) //将某类文本文件加载成RDD

  sc.textFile(“/data/input”)

  sc.textFile(“file:///data/input”)

  sc.textFile(“hdfs:///data/input”)

  sc.textFile(“hdfs://namenode:8020/data/input”)

2. sequenceFile文件(SequenceFileInputFormat)(SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File))

  sc.sequenceFile(“file.txt”) //将本地二进制文件加载成RDD

  sc.sequenceFile[String, Int] (“hdfs://nn:9000/path/file”)

3. 使用任意自定义的Hadoop InputFormat

  sc.hadoopFile(path, inputFmt, keyClass, valClass)

Spark程序设计—控制ReduceTask数目

所有key/value RDD操作符均包含一个可选参数,表示reduce task并行度

  words.reduceByKey(_ + _, 5)

  words.groupByKey(5)

  visits.join(pageViews, 5)

用户也可以通过修改spark.default.parallelism设置默认并行度

默认并行度为最初的RDD partition数目

Spark高级程序设计——accumulator

Accumulator累加器,分布式累加器

  类似于MapReduce中的counter,将数据从一个节点发送到其他各个节点上去

  通常用于监控,调试,记录符合某类特征的数据数目等

import SparkContext._
val total_counter = sc.accumulator(0L, "total_counter")
val counter0 = sc.accumulator(0L, "counter0")    //定义两个累加器
val counter1 = sc.accumulator(0L, "counter1")
val count = sc.parallelize(1 to n, slices).map { i =>
total_counter += 1
val x = random * 2 - 1
val y = random * 21
if (x*x + y*y < 1) {
counter1 += 1    //累加器counter1加1
} else {
counter0 += 1    //累加器counter0加1
}
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)//

Spark高级程序设计—广播变量broadcast

广播机制

  高效分发大对象,比如字典(map),集合(set)等,每个executor一份,而不是每个task一份

  包括HttpBroadcast和TorrentBroadcast两种

val data = Set(1, 2, 4, 6, …..) // 大小为128MB
val bdata = sc.broadcast(data)//将大小为128MB的Set广播出去
val rdd = sc.parallelize(1to 1000000, 100)
val observedSizes= rdd.map(_ => bdata.value.size ….)//在各个task中,通过bdata.value获取广播的集合

Spark高级程序设计—cache

val data = sc.textFile("hdfs://nn:8020/input")
data.cache()
//data.persist(StorageLevel.DISK_ONLY_2)

1、如何创建一个分区为2的RDD:

  创建一个RDD,分区为2,即对list进行并行化,并行度为2

scala> val rdd = sc.parallelize(List(1,2,3),2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

  或者启动10个map Task进行处理,对10个分区都进行map处理

val slices = 10
val n = 100000 * slices
val count = sc.parallelize(1 to n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)

2、将一个有两个分区的RDD收集起来

scala> rdd.collect
res3: Array[Int] = Array(1, 2, 3) 

3、得到这个rdd的分区数

scala> rdd.partitions.size
res4: Int = 2

4、想要看每个分区结果

  用glom,glom是将一个RDD的每一个分区都变成Array

scala> rdd.glom.collect
res5: Array[Array[Int]] = Array(Array(1), Array(2, 3))

 5、将rdd写入hdfs

scala> rdd.saveAsTextFile(“hdfs://nn:8020/output”)
或者 scala> rdd.saveAsSequenceFile(“hdfs://nn:8020/output”)

saveastextfile,写的是一个目录,目录下面会生成文件,不要直接指定文件名称

如果rdd会有多个分区,则生成多个文件

 6、将多个RDD合并为一个RDD

rdd1.union(rdd2)
或者
rdd2++rdd1
或者
sc.union(rdd1,rdd2,rdd3)
最终的分区数是他们分区数的和

7、产生10w个文件,每个文件里有100个整数

sc.parallelize(1 to 1000 0000 ,10 0000).map(x = > scala.uril.Random.nextLong).saveTextFile("file:///tmp/)

 8、将数据放到内存,再从内存中清除掉

rdd.cache()   //需要action才触发cache()
//1w行数据,如果用take(1)来触发action,那么只会cache 一行数据,而不会吧1w行数据都放内存
//从内存中清除 rdd.unpersist(true) //true的意思是,是否一直卡着,知道清空在往下运行

9、在集群上提交程序的时候报错:classNotFound,说明类不在jar包里,去从jar中查找

jar tf xxx.jar |grep 文件名或者类名

10、用命令kill掉运行在Hadoop之上的spark程序

yarn application -kill id

 11、x(0), (x(1), x(2))与x._2._2的区别

//数据集如下
//users.dat
//    UserID::Gender::Age::Occupation::Zip-code
//movies.dat
//    MovieID::Title::Genres
//ratings.dat
//    UserID::MovieID::Rating::Timestamp

//加载数据
val usersRdd = sc.textFile(“users.dat”)
val ratingsRdd = sc.textFile(“ratings.dat”)

//数据抽取
//users: RDD[(userID, (gender, age))]
val users = usersRdd.map(_.split("::")).map { x =>
    (x(0), (x(1), x(2)))
}

//rating: RDD[Array(userID, movieID, ratings, timestamp)]
val rating = ratingsRdd.map(_.split("::"))

//usermovie: RDD[(userID, movieID)]
val usermovie = rating.map{ x =>
(x(0), x(1))
}.filter(_._2.equals(MOVIE_ID))

//useRating: RDD[(userID, (movieID, (gender, age))]
val userRating = usermovie.join(users)
//movieuser: RDD[(movieID, (movieTile, (gender, age))]
val userDistribution = userRating.map { x =>
(x._2._2, 1)
}.reduceByKey(_ + _)
userDistribution.foreach(println)

//总结:如果是Array,那么x(0),x(1)代表的是数组中的第0个,第1个元素
//如果是元组(a,b)这种,那么x._1代表的就是a

12、reduceByKey除了_+_,再举例

//.reduceByKey是对相同key做reduce操作,reduce操作除了_+_还有很多其他用法,如下
dataSet.map(line => (extractKey(line), extractStats(line)))
.reduceByKey((a, b) => a.merge(b))
.collect().foreach()

13、groupByKey返回的是什么?

(String,Iterator),value:迭代器里放的是,同一个key对应的一些值的一个集合,如果要求Iterator的数量,count不可以的话,试试size

14、reduceByKey返回的是什么?

//返回的是 RDD
//如果是 WordCount 这种的
//返回的就是 RDD[(String, Int)]

15、groupByKey可以接收函数吗?如何达到reduceByKey一样的结果?

//不可以
//如下
val words = Array("one", "two", "two", "three", "three", "three") val rdd = sc.parallelize(words).map(word => (word, 1)) val a = rdd.reduceByKey(_ + _) val b = rdd.groupByKey().map(t => (t._1, t._2.sum)) a和b结果一样

 16、sortBy怎么用,如何对key-value 的value 降序排序

//拿 WordCount 举例
map((_, 1)) 
.reduceByKey(_+_) 
.sortBy(_._2,false)

17、sortByKey怎么用

rdd,map(x => (x(1),x(2))).sortByKey(false) //降序

免责声明:文章转载自《spark程序设计》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇nodejs安装,配置环境,使用express建立一个新项目junit私有方法测试下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Spark程序排错

1.shuffle相关 报错提示 org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0 org.apache.spark.shuffle.FetchFailedException: Failed to connect...

spark作业性能调优

spark作业性能调优 优化的目标 保证大数据量下任务运行成功 降低资源消耗 提高计算性能 一、开发调优: (1)避免创建重复的RDD RDD lineage,也就是“RDD的血缘关系链” 开发RDD lineage极其冗长的Spark作业时,创建多个代表相同数据的RDD,进而增加了作业的性能开销。 (2)尽可能复用同一个RDD 比如说,有一个RDD的...

想高效学会Hadoop,你要按照这个路线

学习hadoop,首先我们要知道hadoop是什么? 说到底Hadoop只是一项分布式系统的工具,我们在学习的时候要理解分布式系统设计中的原则以及方法,只有这样才能以不变应万变。再一个就是一定要动手,有什么案例,有什么项目一定要亲自动手去敲。 学习的时候不要害怕遇到问题,问题是最好的老师。其实学习的过程就是逐渐解决问题的过程,当你遇到的问题越来越少的时候,...

Spark(十六)DataSet

  Spark最吸引开发者的就是简单易用、跨语言(Scala, Java, Python, and R)的API。 本文主要讲解Apache Spark 2.0中RDD,DataFrame和Dataset三种API;它们各自适合的使用场景;它们的性能和优化;列举使用DataFrame和DataSet代替RDD的场景。本文聚焦DataFrame和Datase...

大数据技术-spark+hive+hbase研究

大数据 spark 研究(0基础入门)一 背景 1 基础 Scala 语言基础:Scala详细总结(精辟版++) spark 介绍    :  spark介绍     二 环境 1 部署spark   <![if !supportLists]>1、<![endif]>环境准备(1)配套软件版本要求: Java 6+  Python...

可用于Hadoop下的ETL工具——Kettle

看大家分享了好多hadoop相关的一些内容,我为大家介绍一款ETL工具——Kettle。    Kettle是pentaho公司开源的一款ETL工具,跟hadoop一样,也是java实现,其目的就是做数据整合中时数据的抽取(Extract)、转换(Transformat)、加载(Load)工作。Kettle中有两种脚本文件,transformation和j...