Spark SQL概述

摘要:
前言:用sparkcore编写一些逻辑很麻烦。用sql表示它太方便了。1.什么是SparkSQL?SparkSQL是一个处理结构化数据的Spark组件。SparkSQL提供了两种操作数据的方法:sql查询DataFrames/Datasets APISparkSQL=Schema+RDD。2.SparkSQL引入的主要动机是更快地编写和运行Spark程序,编写更少的代码,读取更少的数据。让优化器自动优化

前言:一些逻辑用spark core 来写,会比较麻烦,如果用sql 来表达,那简直太方便了

一、Spark SQL 是什么

是专门处理结构化数据的 Spark 组件

Spark SQL 提供了两种操作数据的方法:

  sql 查询

  DataFrames/Datasets API

Spark SQL = Schema + RDD

二、Spark SQL引入的主要动机

更快地编写和运行Spark程序

编写更少的代码,读取更少的数据,让优化器自动优化程序,释放程序员的工作

Spark SQL概述第1张

三、Spark SQL总体架构

Spark SQL概述第2张

Spark SQL 最底层是 Spark Core,上面的 Catalyst 是一个执行计划的优化器,可以帮助优化查询

在 Catalyst 之上还有两个组件,SQL 和 DataFrame/Dataset ,这两个组件上层对应的接口不一样,SQL 对应的是纯粹的 sql 语句的输入,DataFrame/Dataset 对应的是他们的api产生的输入

SQL 和 DataFrame/Dataset 这两个组件最终的结果都会输入到 Catalyst 这个优化器,等优化后,最终结果会交给 Spark Core 来运行

下面三行是 Spark SQL 的套件,在之上还有一些更高级的 API ,比如机器学习等

四、SQL 与 DataFrame/Dataset

Spark 提供了两种编写 Spark SQL 程序的 API,使用 SQL 查询或者使用 DataFrame/Dataset

使用SQL

  如果你非常熟悉SQL语法,则使用SQL

使用DataFrame/Dataset

  DSL(Domain Specific Language):DSL是一个规范,比如上上个图中的 table,avg,groupby 就是,网上自行搜索 DSL

  采用更通用的语言(Scala,Python)表达你的查询需求

  使用DataFrame更快的捕获错误:SQL 是编译时不检查,运行时检查,而 DataFrame 则是在编译时就去检查,比如检查列是否存在,列类型是否正确

五、Spark SQL API演化

Spark SQL概述第3张

1.3后引入了 DataFrame,但之后发现有一些限制,又引入 Dataset,本来他们两个是不同的 API,后来版本中发现 DataFrame 和 Dataset 有互通的地方,于是2.0里面,DataFrame 成了 Dataset 的子集

1、RDD API(2011)

  JVM对象组成的分布式数据集合

  不可变且具有容错能力

  可处理结构化与非结构化数据

  函数式转换

2、RDD API的局限性

  无Schema

  用户自己优化程序

  从不同的数据源读取数据非常困难

  合并多个数据源中的数据也非常困难

3、DataFrame API(2013)

  Row对象组成的分布式数据集合:一个数据集有很多个记录构成,每个记录都是一个 Row 的对象,Row 保存的信息有,包含哪些列,列名是什么,每一列是什么数据类型

  不可变且具有容错能力

  处理结构化数据

  自带优化器Catalyst,可自动优化程序

  Data source API: DataFrame 比 RDD API 更方便的一点是,有一个 Data source API,它可以让用户非常方便的去读取各种数据源的数据

  所以 DataFrame 内部是无类型的,即 Row 是无类型的,但是 Row 这一行数据里面是有类型的

   DataSet 内部是有类型的,java 对象,需要用户自己去定义

  DataFrame 是一种特殊类型的 DataSet

4、DataFrame API的局限性

  运行时类型检查

  不能直接操作domain对象

  函数式编程风格

  举例:

val dataframe = sqlContext.read.json("people.json”)
dataframe.filter("salary > 1000").show()
//局限性 Throws Runtime exception
org.apache.spark.sql.AnalysisException: cannot resolve 'salary' given input columns age,
name;

//Create RDD[Person]
val personRDD = sc.makeRDD(Seq(Person("A",10), Person("B",20)))
//Create dataframe from a RDD[Person]
val personDF = sqlContext.createDataFrame(personRDD)
//We get back RDD[Row] and not RDD[Person]
personDF.rdd
//局限性 RDD 转换为 DF , DF再转回 RDD 后,会丢失一些信息

注:Spark RDD、DataFrame和DataSet的区别自行网上搜索

5、Dataset

  Dataset 扩展自 DataFrame API,提供了编译时类型安全,面向对象风格的 API

Spark SQL概述第4张Spark SQL概述第5张
case class Person(name: String, age: Int)
val dataframe = sqlContext.read.json("people.json")
val ds : Dataset[Person] = dataframe.as[Person]
// Compute histogram of age by name
val hist = ds.groupBy(_.name).mapGroups({
case (name, people) => {
val buckets = new Array[Int](10)
people.map(_.age).foreach { a =>
buckets(a / 10) += 1
}
(name, buckets)
}
})
View Code

  Dataset API

    类型安全:可直接作用在domain对象上

//Create RDD[Person]
val personRDD = sc.makeRDD(Seq(Person("A",10), Person("B",20)))
//Create Dataset from a RDD
val personDS = sqlContext.createDataset(personRDD)
personDS.rdd
//We get back RDD[Person] and not RDD[Row] in Dataframe

    高效:代码生成编解码器,序列化更高效

    协作:Dataset与Dataframe可相互转换

  编译时类型检查

case class Person(name: String, age: Long)
val dataframe = sqlContext.read.json("people.json")
val ds : Dataset[Person] = dataframe.as[Person] 
ds.filter(p => p.age > 25) ds.filter(p => p.salary > 12500) //error: value salary is not a member of Person

免责声明:文章转载自《Spark SQL概述》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇EasyUI 动态更新列异步FIFO结构及FPGA设计 跨时钟域设计下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Hadoop的管理目录

HDFS文件结构 1、NameNode的文件结构,NameNode会创建VERSION、edits、fsimage、fstime文件目录。其中dfs.name.dir属性是一个目录列表,是每个目录的镜像文件。VERSION文件是JAVA属性文件,其中包含运行HDFS的版本信息。包含内容:   其中,namespaceID是文件系统的唯一标识符,当文件系统...

MySQL查询性能优化

1.为什么查询速度为变慢 在尝试编写快速的查询之前,需要清楚一点,真正重要是响应时间。如果把查询看作是一个任务,那么他由一系列子任务组成,每个子任务都会消耗一定的时间。如果要优化查询,实际上要优化其子任务,要么消除其中一些子任务,要么减少子任务的执行的次数,要么让子任务运行得更快。 MySQL在执行查询的时候有哪些子任务。哪些子任务运行的速度很慢,这里很难...

Spark官方调优文档翻译(转载)

Spark调优由于大部分Spark计算都是在内存中完成的,所以Spark程序的瓶颈可能由集群中任意一种资源导致,如:CPU、网络带宽、或者内存等。最常见的情况是,数据能装进内存,而瓶颈是网络带宽;当然,有时候我们也需要做一些优化调整来减少内存占用,例如将RDD以序列化格式保存(storing RDDs in serialized form)。本文将主要涵盖...

5.2Python数据处理篇之Sympy系列(二)---Sympy的基本操作

目录 目录 目录 前言 (一)符号的初始化与输出设置-symbol() symbols() latex() (1)说明: (2)源代码: (3)输出效果 1.作用: 2.操作: (二)替换符号-subs(old,new) (1)是否改变原表达式 (2)替换多个表达式 1.说明: 2.源代码: 3.输出效果: 4.注意点: (三)将字符串变为s...

[Spark]-作业调度与动态资源分配

1.概述   由 Spark 集群篇 ,每个Spark应用(其中包含了一个SparkContext实例),都会运行一些独占的执行器(executor)进程.集群调度器会提供对这些 Spark 应用的资源调度.   而在各个Spark应用内部,各个线程可能并发地通过action算子提交多个Spark作业(job).这里就是Spark的作业调度   Spark...

mysql优化之N+1问题

  在网上找了小马哥视频来学习了一下mysql的优化。准备写些博客来做个总结,加深记忆。 什么是N+1问题   A对象关联B对象,A对象进行列表展示时需显示B对象的关联属性,这样需要先用一条sql将N个A对象查询出来,再用N条sql将这些对象的关联属性查询出来。违背了减少数据库交互原则,影响性能。 解决方法   方法一:连接查询,在查询A对象的时候,将关...