Spark Partition

摘要:
根据字段划分分区,类似于关系数据库中的分区。默认情况下,Spark从HDFS读取的分区数等于HDFS文件中的块数。区别在于partitionBy只能用于PairRdd(键值数据)。

分区的意义

Spark RDD 是一种分布式的数据集,由于数据量很大,因此它被切分成不同分区并存储在各个Worker节点的内存中。从而当我们对RDD进行操作时,实际上是对每个分区中的数据并行操作。Spark根据字段进行partition类似于关系型数据库中的分区,可以加大并行度,提高执行效率。Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元。 

Spark Partition第1张Spark Partition第2张

Spark Partition第3张     Spark Partition第4张

1. RDD repartition和partitionBy的区别

spark中RDD两个常用的重分区算子,repartition 和 partitionBy 都是对数据进行重新分区,默认都是使用 HashPartitioner,区别在于partitionBy 只能用于 PairRdd(key-value类型的数据),但是当它们同时都用于 PairRdd时,效果也是不一样的。reparation的分区比较的随意,没有什么规律,而partitionBy把相同的key都分到了同一个分区。

val parRDD = pairRDD.repartition(10) //重分区为10;
val parRDD = pairRDD.partitionBy(new HashPartitioner(10)) //重分区为10;

Spark Partition第5张Spark Partition第6张
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD 

object PartitionDemo {
 
  Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("localTest").setMaster("local[4]")
    val sc = new SparkContext(conf)
    
    val rdd = sc.parallelize(List("hello", "jason", "what", "are", "you", "doing","hi","jason","do","you","eat","dinner",
            "hello","jason","do","you","have","some","time","hello","jason","time","do","you","jason","jason"),4) //设置4个分区;
    val word_count = rdd.flatMap(_.split(",")).map((_,1)) 
    val repar = word_count.repartition(10)                       //重分区为10;
    val parby = word_count.partitionBy(new HashPartitioner(10))  //重分区为10;
    print(repar)
    print(parby)
  }
  
  def print(rdd : RDD[(String, Int)]) = {
    rdd.foreachPartition(pair=>{
      println("partion " + TaskContext.get.partitionId + ":") 
      pair.foreach(p=>{ println("  " + p) })
    })
    println
  }
}
View Code

partitionBy的三种分区方式:

1、HashPartitioner
val parRDD= pairRDD.partitionBy(new HashPartitioner(3))
HashPartitioner确定分区的方式:partition = key.hashCode () % numPartitions

2、RangePartitioner
val parRDD= pairRDD.partitionBy(new RangePartitioner(3,counts))
RangePartitioner会对key值进行排序,然后将key值被划分成3份key值集合。

3、CustomPartitioner
CustomPartitioner可以根据自己具体的应用需求,自定义分区。

class CustomPartitioner(numParts: Int) extends Partitioner {
 override def numPartitions: Int = numParts
 override def getPartition(key: Any): Int =
 {
      if(key==1)){ 0 }
      else if (key==2){ 1} 
      else{ 2 }
  } 
}
val parRDD = pairRDD.partitionBy(new CustomPartitioner(3))        

2. DataFrame分区 

1. repartition:根据字段分区
val regionDf = peopleDf.repartition($"region")

2. coalesce: coalesce一般用于合并/减少分区,将数据从一个分区移到另一个分区。
val peopleDF2= peopleDF.coalesce(2) // 原来分区为4,减少到2, 无法增加分区数,例如peopleDF.coalesce(6)执行完分区还是4

二者区别The repartition algorithm does a full shuffle of the data and creates equal sized partitions of data. coalesce combines existing partitions to avoid a full shuffle.

为什么使用repartition而不用coalesce? A full data shuffle is an expensive operation for large data sets, but our data puddle is only 2,000 rows. The repartition method returns equal sized text files, which are more efficient for downstream consumers. (non-partitioned) It took 241 seconds to count the rows in the data puddle when the data wasn’t repartitioned (on a 5 node cluster). (partitioned) It only took 2 seconds to count the data puddle when the data was partitioned — that’s a 124x speed improvement!

3. DataFrameWriter 分段和分区

1. bucketBy:分段和排序仅适用于持久表。 对于基于文件的数据源,可以对输出进行分类。
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

2. partitionBy:分区则可以同时应用于save和saveAsTable
peopleDF.write.partitionBy("region").format("parquet").save("people_partitioned.parquet")

saveAsTable 保存数据并持久化表
         DataFrame可以使用saveAsTable 命令将其作为持久表保存到Hive Metastore中。Spark将为您创建一个默认的本地Hive Metastore(使用Derby)。与createOrReplaceTempView命令不同的是, saveAsTable将实现DataFrame的内容并创建指向Hive Metastore中的数据的指针。即使您的Spark程序重新启动后,永久性表格仍然存在,只要您保持与同一Metastore的连接即可。用于持久表的DataFrame可以通过使用表的名称调用tablea方法来创建SparkSession。
       持久化表时您可以自定义表格路径 ,例如df.write.option("path", "/some/path").saveAsTable("t")。当表被删除时,自定义表路径将不会被删除,表数据仍然存在。如果没有指定自定义表格路径,Spark会将数据写入仓库目录下的默认表格路径。当表被删除时,默认的表路径也将被删除。

4. JDBC partition

Spark提供jdbc方法操作数据库,每个RDD分区都会建立一个单独的JDBC连接。 尽管用户可以设置RDD的分区数目,在一些分布式的shuffle操作(例如reduceByKey 和join)之后,RDD又会变成默认的分区数spark.default.parallelism,这种情况下JDBC连接数可能超出数据库的最大连接。Spark 2.1提供numPartitions 参数来设置JDBC读写时的分区数,可以解决前面说的问题。如果写数据时的分区数超过最大值,我们可以在写之前使用方法coalesce(numPartitions)来减少分区数

val userDF = spark.read.format("jdbc").options(Map("url" -> url, "dbtable" -> sourceTable, "lowerBound"->"1", "upperBound"->"886500", "partitionColumn"->"user_id", "numPartitions"->"10")).load()

userDF.write.option("maxRecordsPerFile", 10000).mode("overwrite").parquet(outputDirectory)
userDF.repartition(10000).write.mode("overwrite").parquet(outputDirectory)

 

分区案例

val df = spark.read.format("jdbc").options(Map("url" -> url, "dbtable" -> sourceTable, "lowerBound"->"1", "upperBound"->"825485207", "partitionColumn"->"org_id", "numPartitions"->"10")).load()

(1) jdbc partition: df.write.format("com.databricks.spark.csv").mode("overwrite").save(s"$filePath/$filename"+"_readpar")
(2) maxRecordsPerFile: df.write.option("maxRecordsPerFile", 10000).format("com.databricks.spark.csv").mode("overwrite").save(s"$filePath/$filename"+"_maxRecd")
(3) repartition: df.repartition(4).write.format("com.databricks.spark.csv").mode("overwrite").save(s"$filePath/$filename"+"_repar")
(4) rdd key-value partitionBy: df.rdd.map(r => (r.getInt(1), r)).partitionBy(new HashPartitioner(10)).values.saveAsTextFile(s"$filePath/$filename"+"_rddhash")

(1) jdbc partition:数据分布不均匀,有些分区数据多有的少; key是有序的,根据bound区间将key分成不同分区

Spark Partition第7张  Spark Partition第8张

(2) maxRecordsPerFile:同上,当一个分区条数超过maxRecordsPerFile,会被拆分成多个子分区,同一个Key可能因此被分到不同分区

Spark Partition第9张

(3) repartition:分成同等大小的分区(不能保证每个分区的条数是一样的); key是无序的,同样的key可能在不同分区

Spark Partition第10张

 (4) rdd key-value partitionBy: 使用partition方法将数据按照一定规则分区,可以自定义分区规则

 Spark Partition第11张  Spark Partition第12张

--------------------- 

作者:zhangzeyuan56 
来源:CSDN 
原文:https://blog.csdn.net/zhangzeyuan56/article/details/80935034 
版权声明:本文为博主原创文章,转载请附上博文链接!

---------------------
作者:junzhou134
来源:CSDN
原文:https://blog.csdn.net/m0_37138008/article/details/78936029
版权声明:本文为博主原创文章,转载请附上博文链接!

--------------------- 
作者:JasonLeeblog 
来源:CSDN 
原文:https://blog.csdn.net/xianpanjia4616/article/details/84328928 
版权声明:本文为博主原创文章,转载请附上博文链接!

免责声明:文章转载自《Spark Partition》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇软件开发基本原则(一)—— 策略和因素 (转)08年为何报土木工程?下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

homebrew & brew cask使用技巧及Mac软件安装

homebrew 安装 /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" 命令 安装软件:brew install 软件名,例:brew install wget 搜索软件:brew search 软件名...

(转)大型分布式网站架构技术总结

本文是学习大型分布式网站架构的技术总结。对架构一个高性能,高可用,可伸缩,可扩展的分布式网站进行了概要性描述,并给出一个架构参考。一部分为读书笔记,一部分是个人经验总结。对大型分布式网站架构有很好的参考价值。   本次分享大纲如下 大型网站的特点 大型网站架构目标 大型网站架构模式 高性能架构 高可用架构 可伸缩架构 可扩展架构 安全架构 敏捷架构 大型架...

WPF用样式实现TextBox的虚拟提示效果

【版权声明】本文为博主原创,未经允许禁止用作商业用途,如有转载请注明出处。  话说好多软件和网站都能实现虚拟提示,好吧这个名词是我自己起的,因为我也不知道这么形容这个效果。   效果描述:在TextBox没有输入值之前显示灰色提示信息,当获得焦点时提示自动消失,如果没有编辑离开此控件则依然显示提示信息,直到输入值为止。   效果图:   这里我用到了一个...

Spring Boot源码(一)Spring Boot源码环境搭建

一、前言   既然要分析源码,那就直接下载源码来本地运行分析,是最有效的方案,但是在开始看这篇博客之前,希望小伙伴们有个心理准备...   源码编译是比较麻烦的一件事,我大概整了一天才基本整好源码环境,期间可能遇到各种奇奇怪怪的问题上网找答案,这里把流程记录一下,需要的小伙伴可以直接跟着步骤走,还是可以顺利编译通过的,亲测可行。 二、源码环境搭建 下载源码...

Vue生命周期钩子---3

vue生命周期流程图:4张图 : 生命周期的解析和应用: Vue 实例有一个完整的生命周期,也就是从开始创建、初始化数据、编译模板、挂载Dom→渲染、更新→渲染、卸载等一系列过程,我们称这是 Vue 的生命周期。通俗说就是 Vue 实例从创建到销毁的过程,就是生命周期。 beforecreate : 完成实例初始化,初始化非响应式变量this指向创建的...

win批处理(笔记)

@echo off (关闭执行过程,只显示结果) color 0a 黑绿 title 主题名 echo 打印输出 pause暂停 echo.空一行;换行; echo 垃圾清理 d: >nul 2 >nul cd >nul 2>nul rd . sq >nul 2>nul ping -n 10 10.1.1.1 >n...