《Spark Python API 官方文档中文版》 之 pyspark.sql (三)

摘要:
我想在这里记录一下,我希望为那些对Spark感兴趣并从事大数据开发的人提供有价值的中文材料,这将有助于PySpark开发人员的工作和学习。
摘要:在Spark开发中,由于需要用Python实现,发现API与Scala的略有不同,而Python API的中文资料相对很少。每次去查英文版API的说明相对比较慢,还是中文版比较容易get到所需,所以利用闲暇之余将官方文档翻译为中文版,并亲测Demo的代码。在此记录一下,希望对那些对Spark感兴趣和从事大数据开发的人员提供有价值的中文资料,对PySpark开发人员的工作和学习有所帮助。

官网地址:http://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html            

pyspark.sql module

Module Context

Spark SQL和DataFrames重要的类有:
pyspark.sql.SQLContext DataFrame和SQL方法的主入口
pyspark.sql.DataFrame 将分布式数据集分组到指定列名的数据框中
pyspark.sql.Column DataFrame中的列
pyspark.sql.Row DataFrame数据的行
pyspark.sql.HiveContext 访问Hive数据的主入口
pyspark.sql.GroupedData 由DataFrame.groupBy()创建的聚合方法集
pyspark.sql.DataFrameNaFunctions 处理丢失数据(空数据)的方法
pyspark.sql.DataFrameStatFunctions 统计功能的方法
pyspark.sql.functions DataFrame可用的内置函数
pyspark.sql.types 可用的数据类型列表
pyspark.sql.Window 用于处理窗口函数

4.class pyspark.sql.GroupedData(jdf, sql_ctx)

由DataFrame.groupBy()创建的DataFrame上的一组聚合方法。

4.1 agg(*exprs)

计算聚合并将结果作为DataFrame返回。
可用的集合函数是avg,max,min,sum,count。
如果exprs是从字符串到字符串的单个字典映射,那么键是要执行聚合的列,值是聚合函数。
另外,exprs也可以是聚合列表达式的列表。
参数:●  exprs – 从列名(字符串)到聚集函数(字符串)的字典映射或列的列表。

>>> gdf = df.groupBy(df.name)
>>> gdf.agg({"*": "count"}).collect()
[Row(name=u'Alice', count(1)=1), Row(name=u'Bob', count(1)=1)]
>>> from pyspark.sql import functions as F
>>> gdf.agg(F.min(df.age)).collect()
[Row(name=u'Alice', min(age)=2), Row(name=u'Bob', min(age)=5)]

4.2 avg(*args)

计算每个组的每个数字列的平均值。
mean()是avg()的别名。
参数:●  cols – 列名称列表(字符串),非数字列被忽略。

>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.groupBy().avg('age').collect()
[Row(avg(age)=3.5)]
>>> l3=[('Alice',2,85),('Bob',5,80)]
>>> df3 = sqlContext.createDataFrame(l3,['name','age','height'])
>>> df3.groupBy().avg('age', 'height').collect()
[Row(avg(age)=3.5, avg(height)=82.5)]

4.3 count()

统计每个组的记录数。

>>> df.groupBy(df.age).count().collect()
[Row(age=2, count=1), Row(age=5, count=1)] 

4.4 max(*args)

计算每个组的每个数字列的最大值。

>>> df.groupBy().max('age').collect()
[Row(max(age)=5)]
>>> df3.groupBy().max('age', 'height').collect()
[Row(max(age)=5, max(height)=85)]

4.5 mean(*args)

计算每个组的每个数字列的平均值。
mean()是avg()的别名。
参数:●  cols – 列名称列表(字符串),非数字列被忽略。

>>> df.groupBy().mean('age').collect()
[Row(avg(age)=3.5)]
>>> df3.groupBy().mean('age', 'height').collect()
[Row(avg(age)=3.5, avg(height)=82.5)]

4.6 min(*args)

计算每个组的每个数字列的最小值。
参数:●  cols – 列名称列表(字符串),非数字列被忽略。

>>> df.groupBy().min('age').collect()
[Row(min(age)=2)]
>>> df3.groupBy().min('age', 'height').collect()
[Row(min(age)=2, min(height)=80)]

4.7 pivot(pivot_col, values=None)

旋转当前[[DataFrame]]的列并执行指定的聚合。 有两个版本的透视函数:一个需要调用者指定不同值的列表以进行透视,另一个不需要。 后者更简洁但效率更低,因为Spark需要首先在内部计算不同值的列表。
参数:●  pivot_col – 要旋转的列的名称。
      ●  values – 将被转换为输出DataFrame中的列的值的列表。

// 计算每个课程每年的收入总和作为一个单独的列
>>> l4=[(2012,'dotNET',10000),(2012,'dotNET',5000),(2012,'Java',20000),(2013,'dotNET',48000),(2013,'Java',30000)]
>>> df4 = sqlContext.createDataFrame(l4,['year','course','earnings'])
>>> df4.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").collect() 
[Row(year=2012, dotNET=15000, Java=20000), Row(year=2013, dotNET=48000, Java=30000)]
// 或者不指定列值(效率较低)
>>> df4.groupBy("year").pivot("course").sum("earnings").collect() 
[Row(year=2012, Java=20000, dotNET=15000), Row(year=2013, Java=30000, dotNET=48000)]

4.8 sum(*args) 

计算每个组的每个数字列的总和。
参数:●  cols – 列名称列表(字符串),非数字列被忽略。

>>> df.groupBy().sum('age').collect()
[Row(sum(age)=7)]
>>> df3.groupBy().sum('age', 'height').collect()
[Row(sum(age)=7, sum(height)=165)]

5.class pyspark.sql.Column(jc)

DataFrame中的一列。
列实例可以通过以下方式创建:

# 1. Select a column out of a DataFrame
df.colName
df["colName"]
# 2. Create from an expression
df.colName + 1
1 / df.colName

5.1 alias(*alias)

使用新名称返回此列的别名(在返回多个列的表达式情况下如explode)。

>>> df.select(df.age.alias("age2")).collect()
[Row(age2=2), Row(age2=5)]

5.2 asc()

基于给定列名称的升序返回一个排序表达式。

5.3 astype(dataType)

将列转换为dataType类型。

>>> df.select(df.age.astype("string").alias('ages')).collect()
[Row(ages=u'2'), Row(ages=u'5')]
>>> from pyspark.sql.types import StringType
>>> df.select(df.age.astype(StringType()).alias('ages')).collect()
[Row(ages=u'2'), Row(ages=u'5')]

5.4 between(lowerBound, upperBound)

一个布尔表达式,如果此表达式的值位于给定列之间,则该表达式的值为true。

>>> df.select(df.name, df.age.between(2, 4)).show()
+-----+--------------------------+
| name|((age >= 2) && (age <= 4))|
+-----+--------------------------+
|Alice|                      true|
|  Bob|                     false|
+-----+--------------------------+

5.5 bitwiseAND(other)

二元运算符

5.6 bitwiseOR(other)

二元运算符

5.7 bitwiseXOR(other)

二元运算符

5.8 cast(dataType)

将列转换为dataType类型。

>>> df.select(df.age.cast("string").alias('ages')).collect()
[Row(ages=u'2'), Row(ages=u'5')]
>>> df.select(df.age.cast(StringType()).alias('ages')).collect()
[Row(ages=u'2'), Row(ages=u'5')]

5.9 desc()

基于给定列名称的降序返回一个排序表达式。

5.10 endswith(other)

二元运算符

5.11 getField(name) 

在StructField中通过名称获取字段的表达式。

>>> from pyspark.sql import Row
>>> df = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
>>> df.select(df.r.getField("b")).show()
+----+
|r[b]|
+----+
|   b|
+----+
>>> df.select(df.r.a).show()
+----+
|r[a]|
+----+
|   1|
+----+

5.12 getItem(key)

从列表中获取位置序号项,或者通过字典的key获取项的表达式。

>>> df = sc.parallelize([([1, 2], {"key": "value"})]).toDF(["l", "d"])
>>> df.select(df.l.getItem(0), df.d.getItem("key")).show()
+----+------+
|l[0]|d[key]|
+----+------+
|   1| value|
+----+------+
>>> df.select(df.l[0], df.d["key"]).show()
+----+------+
|l[0]|d[key]|
+----+------+
|   1| value|
+----+------+

5.13 inSet(*cols)

一个布尔表达式,如果此表达式的值由参数的评估值包含,则该值被评估为true。

>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df[df.name.inSet("Bob", "Mike")].collect()
[Row(name=u'Bob', age=5)]
>>> df[df.age.inSet([1, 2, 3])].collect()
[Row(name=u'Alice', age=2)]

注:在1.5中已过时,用Column.isin()代替。

5.14 isNotNull()

如果当前表达式不为null,则为真。

5.15 isNull()

如果当前表达式为null,则为真。

5.16 isin(*cols)

一个布尔表达式,如果此表达式的值由参数的评估值包含,则该值被评估为true。

>>> df[df.name.isin("Bob", "Mike")].collect()
[Row(name=u'Bob', age=5)]
>>> df[df.age.isin([1, 2, 3])].collect()
[Row(name=u'Alice', age=2)]

5.17 like(other)

二元运算符

5.18 otherwise(value)

评估条件列表并返回多个可能的结果表达式之一。 如果不调用Column.otherwise(),则不匹配条件返回None。
例如,请参阅pyspark.sql.functions.when()
参数:● value – 一个文字值或一个Column表达式。

>>> from pyspark.sql import functions as F
>>> df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()
+-----+---------------------------------+
| name|CASE WHEN (age > 3) THEN 1 ELSE 0|
+-----+---------------------------------+
|Alice|                                0|
|  Bob|                                1|
+-----+---------------------------------+

5.19 over(window)

定义一个窗口列。
参数:window – 一个WindowSpec
返回:一列
注:Window方法仅再HiveContext1.4支持。

5.20 rlike(other)

二元运算符

5.21 startswith(other)

二元运算符

5.22 substr(startPos, length)

返回一个新列,它是列的一个子字符串。
参数:● startPos – 其实位置 (int或者Column)
           ● length – 子串的长度(int或者Column)

>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.select(df.name.substr(1, 3).alias("col")).collect()
[Row(col=u'Ali'), Row(col=u'Bob')]

5.23 when(condition, value)

评估条件列表并返回多个可能的结果表达式之一。 如果不调用Column.otherwise(),则不匹配条件返回None。
例如,请参阅pyspark.sql.functions.when()。
参数:● condition – 一个布尔类型的列表达式。
           ● value – 一个文字值或一个列表达式。

>>> from pyspark.sql import functions as F
>>> df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
+-----+--------------------------------------------------------+
| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0|
+-----+--------------------------------------------------------+
|Alice|                                                      -1|
|  Bob|                                                       1|
+-----+--------------------------------------------------------+

6. class pyspark.sql.Row

DataFrame中的一行,其中的字段可以像属性一样访问。
Row可以用来通过使用命名参数来创建一个行对象,字段将按名称排序。

>>> from pyspark.sql import Row
>>> row = Row(name="Alice", age=11)
>>> row
Row(age=11, name='Alice')
>>> row['name'], row['age']
('Alice', 11)
>>> row.name, row.age
('Alice', 11)

Row也可以用来创建另一个Row像类一样,然后它可以被用来创建Row对象,比如

>>> Person = Row("name", "age")
>>> Person
<Row(name, age)>
>>> Person("Alice", 11)
Row(name='Alice', age=11)

6.1 asDict(recursive=False)

作为字典返回
参数:● recursive – 将嵌套的Row转换为字典(默认值:False)。

>>> Row(name="Alice", age=11).asDict() == {'name': 'Alice', 'age': 11}
True
>>> row = Row(key=1, value=Row(name='a', age=2))
>>> row.asDict() == {'key': 1, 'value': Row(age=2, name='a')}
True
>>> row.asDict(True) == {'key': 1, 'value': {'name': 'a', 'age': 2}}
True

7. class pyspark.sql.DataFrameNaFunctions(df)

在DataFrame中处理丢失的数据的功能。

7.1 drop(how='any', thresh=None, subset=None)

返回一个新的DataFrame,省略含有空值的行。DataFrame.dropna()和 DataFrameNaFunctions.drop()是彼此的别名。
参数:● how – 'any'或者'all'.如果为'any', 如果它包含任何空值,则丢掉一行。如果为'all',只有当它的所有值都为空时才丢掉一行。
           ● thresh – 默认值为None,如果指定为int,删除小于阈值的非空值的行。 这将覆盖how参数。
           ● subset – 要考虑的列名的可选列表。

>>> l4=[('Alice',10,80),('Bob',5,None),('Tom',None,None),(None,None,None)]
>>> df4 = sqlContext.createDataFrame(l4,['name','age','height'])
>>> df4.na.drop().show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice| 10|    80|
+-----+---+------+

7.2 fill(value, subset=None)

DataFrame.fillna() and DataFrameNaFunctions.fill() are aliases of each other.
替换null值,是na.fill()的别名。 DataFrame.fillna()和DataFrameNaFunctions.fill()是彼此的别名。
参数:● value – 整形,长整形,浮点型,字符串,或者字典。用来替换空值的值。如果值是字典,则subset将被忽略,值必须是从列名(字符串)到要替换值的映射。替换值必须是整形,长整形,浮点型或字符串。
           ● subset – 要替换的列名的可选列表。在subset指定的列,如果不具有匹配的数据类型会被忽略。例如,如果value是一个字符串,并且subset包含一个非字符串列,那么非字符串列将被忽略。

>>> df4.na.fill(50).show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice| 10|    80|
|  Bob|  5|    50|
|  Tom| 50|    50|
| null| 50|    50|
+-----+---+------+
>>> df4.na.fill({'age': 50, 'name': 'unknown'}).show()
+-------+---+------+
|   name|age|height|
+-------+---+------+
|  Alice| 10|    80|
|    Bob|  5|  null|
|    Tom| 50|  null|
|unknown| 50|  null|
+-------+---+------+

7.3 replace(to_replace, value, subset=None)

返回用另外一个值替换了一个值的新的DataFrame。DataFrame.replace() 和 DataFrameNaFunctions.replace()是彼此的别名。
参数:● to_replace – 整形,长整形,浮点型,字符串,或者列表。要替换的值。如果值是字典,那么值会被忽略,to_replace必须是一个从列名(字符串)到要替换的值的映射。要替换的值必须是一个整形,长整形,浮点型,或者字符串。
          ● value – 整形,长整形,浮点型,字符串或者列表。要替换为的值。要替换为的值必须是一个整形,长整形,浮点型,或者字符串。如果值是列表或者元组,值应该和to_replace有相同的长度。
          ● subset – 要考虑替换的列名的可选列表。在subset指定的列如果没有匹配的数据类型那么将被忽略。例如,如果值是字符串,并且subset参数包含一个非字符串的列,那么非字符串的列被忽略。

>>> l4=[('Alice',10,80),('Bob',5,None),('Tom',None,None),(None,None,None)]
>>> df4 = sqlContext.createDataFrame(l4,['name','age','height'])
>>> df4.na.replace(10, 20).show()
+-----+----+------+
| name| age|height|
+-----+----+------+
|Alice|  20|    80|
|  Bob|   5|  null|
|  Tom|null|  null|
| null|null|  null|
+-----+----+------+
>>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
+----+----+------+
|name| age|height|
+----+----+------+
|   A|  10|    80|
|   B|   5|  null|
| Tom|null|  null|
|null|null|  null|
+----+----+------+

8. class pyspark.sql.DataFrameStatFunctions(df)

DataFrame的统计函数的功能。

8.1 corr(col1, col2, method=None)

以双精度值计算DataFrame的两列的相关性。目前只支持皮尔森相关系数. DataFrame.corr() and DataFrameStatFunctions.corr() 互为别名。

参数:● col1 – 第一列的名称
           ● col2 – 第二列的名称
           ● method – 相关方法,目前只支持“皮尔森”

8.2 cov(col1, col2)

计算给定列的样本协方差(由它们的名称指定)作为双精度值。DataFrame.cov() and DataFrameStatFunctions.cov() 互为别名。

参数:● col1 – 第一列的名称
           ● col2 – 第二列的名称

8.3 crosstab(col1, col2)

计算给定列的成对频率表. 也被称为应急表. 每列的去重后不同值的数量应小于1e4. 最多1e6非零对频率将被返回. 每行的第一列将是col1的不同值,列名将是col2的不同值.第一列的名称应该为$col1_$col2. 没有出现的对数将为零. DataFrame.crosstab() and DataFrameStatFunctions.crosstab() 互为别名

参数:● col1 – 第一列的名称. 去重项将成为每一行的第一项。
           ● col2 – 第二列的名称. 去重项将成为DataFrame的列名称。

8.4 freqItems(cols, support=None)

找到列的频繁项,可能有误差。使用“http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou”中描述的频繁元素计数算法。 DataFrame.freqItems() and DataFrameStatFunctions.freqItems()互为别名。

注:此功能用于探索性数据分析,因为我们不保证所生成的DataFrame的模式的向后兼容性。
参数:● cols – 用于计算频繁项的列的名称,为字符串的列表或元组。
           ● support –“频繁”项目的频率。 默认值是1%,必须大于1e-4。

8.5 sampleBy(col, fractions, seed=None)

根据每层上给出的分数返回一个没有更换的分层样本。
参数:● col – 定义分层的列
           ● fractions – 每层的抽样比例,如果没有指定层,我们将其分数视为零。
           ● seed – 随机值
返回: 一个代表分层样本的新DataFrame

>>> from pyspark.sql.functions import col
>>> dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key"))
>>> sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
>>> sampled.groupBy("key").count().orderBy("key").show()
+---+-----+
|key|count|
+---+-----+
|  0|    5|
|  1|    9|
+---+-----+

9. class pyspark.sql.Window

用于在DataFrame中定义窗口的实用函数。
例如:

>>> # PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
>>> window = Window.partitionBy("country").orderBy("date").rowsBetween(-sys.maxsize, 0)

>>> # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
>>> window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)

9.1 static orderBy(*cols)

用定义的顺序创建一个WindowSpec。

9.2 static partitionBy(*cols)

用定义的分区创建一个WindowSpec。

10. class pyspark.sql.WindowSpec(jspec)

定义分区,排序和框边界的窗口规范。
使用Window中的静态方法创建一个WindowSpec

10.1 orderBy(*cols)

定义WindowSpec中的排序列。
参数:● cols – 列或表达式的名称

10.2 partitionBy(*cols)

定义WindowSpec中的分区列。
参数:● cols – 列或表达式的名称

10.3 rangeBetween(start, end)

定义从开始(包含)到结束(包含)的框边界。
start, end都是相对于当前行。 例如,“0”表示“当前行”,而“-1”表示在当前行之前一次,“5”表示当前行之后五次关闭。
参数:● start – 开始边界(包括)。 如果这是-sys.maxsize(或更低),则该框架是无限的。
           ● end – 结束边界(包括)。如果这是sys.maxsize(或更高),则该框架是无限的。

10.4 rowsBetween(start, end)

定义从开始(包含)到结束(包含)的框边界。
start, end都是相对于当前行。 例如,“0”表示“当前行”,而“-1”表示在当前行之前一次,“5”表示当前行之后五次关闭。
参数:● start – 开始边界(包括)。 如果这是-sys.maxsize(或更低),则该框架是无限的。
           ● end – 结束边界(包括)。如果这是sys.maxsize(或更高),则该框架是无限的。

11. class pyspark.sql.DataFrameReader(sqlContext)

用于从外部存储系统(例如文件系统,键值存储等)加载DataFrame的接口。 使用SQLContext.read()来访问这个。

11.1 format(source)

指定输入数据源格式。
参数:● source – string,数据源名称,例如:'json','parquet'。

people.json文件内容:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

>>> df = sqlContext.read.format('json').load('/test/people.json') 
>>> df.dtypes
[('age', 'bigint'), ('name', 'string')]

11.2 jdbc

(url,table,column=None,lowerBound=None,upperBound=None,numPartitions=None,predicates=None,properties=None)
构建一个DataFrame表示通过JDBC URL url命名的table和连接属性连接的数据库表。
column参数可用于对表进行分区,然后根据传递给此函数的参数并行检索它。
predicates参数给出了一个适合包含在WHERE子句中的列表表达式; 每一个都定义了DataFrame的一个分区。
注:不要在大型集群上并行创建太多分区; 否则Spark可能会使外部数据库系统崩溃。

参数:● url – 一个JDBC URL
           ● table – 表名称
      ● column – 用于分区的列
      ● lowerBound – 分区列的下限
      ● upperBound – 分区列的上限
      ● numPartitions – 分区的数量
      ● predicates – 表达式列表
      ● properties – JDBC数据库连接参数,任意字符串的标签/值的列表。通常至少应该包括一个“用户”和“密码”属性。
返回 : 一个DataFrame

11.3 json(path, schema=None)

加载一个JSON文件(每行一个对象)或一个存储JSON对象的字符串RDD(每个记录一个对象),并返回结果为:class`DataFrame`。
如果未指定schema参数,则此函数会经过一次输入以确定输入模式。
参数:● path - 字符串表示JSON数据集的路径,或者存储JSON对象的字符串的RDD
           ● schema – 输入模式的可选StructType。
你可以设置以下特定于JSON的选项来处理非标准的JSON文件:
* primitivesAsString (默认false): 将所有原始值推断为字符串类型
* allowComments (默认false): 忽略JSON记录中的Java / C++样式注释
* allowUnquotedFieldNames (默认false): 允许未加引号的JSON字段名称
* allowSingleQuotes (默认true): 允许除双引号外的单引号
* allowNumericLeadingZeros (默认false): 允许数字中的前导零(例如00012)

>>> df1 = sqlContext.read.json('/test/people.json')
>>> df1.dtypes
[('age', 'bigint'), ('name', 'string')]
>>> rdd = sc.textFile('/test/people.json')
>>> df2 = sqlContext.read.json(rdd)
>>> df2.dtypes
[('age', 'bigint'), ('name', 'string')]

11.4 load(path=None, format=None, schema=None, **options)

从数据源加载数据并将其作为:class`DataFrame`返回。
参数:● path - 可选字符串或文件系统支持的数据源的字符串列表
   ● format – 数据源格式的可选字符串。 默认为“parquet”
           ● schema – 输入模式的可选StructType。
           ● options – 所有其他字符串选项。

注:parquet_partitioned文件夹路径为:spark-1.6.2-bin-hadoop2.6python est_supportsqlparquet_partitioned
       people.json和people1.json文件路径为:spark-1.6.2-bin-hadoop2.6python est_supportsql

>>> df = sqlContext.read.load('/test/parquet_partitioned', opt1=True,opt2=1, opt3='str')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
>>> df = sqlContext.read.format('json').load(['/test/people.json','/test/people1.json'])
>>> df.dtypes
[('age', 'bigint'), ('aka', 'string'), ('name', 'string')]

11.5 option(key, value)

为基础数据源添加一个输入选项。

11.6 options(**options)

为基础数据源添加多个输入选项。

11.7 orc(path)

加载ORC文件,将结果作为DataFrame返回。
注:目前ORC支持只能与HiveContext一起使用。

11.8 parquet(*paths)

加载parquet文件, 将结果作为DataFrame返回。

>>> df = sqlContext.read.parquet('/test/parquet_partitioned')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]

11.9 schema(schema)

指定输入的schema.
某些数据源(例如JSON)可以从数据自动推断输入模式。通过在这里指定模式,底层数据源可以跳过模式推断步骤,从而加速数据加载。
参数:● schema – 一个StructType对象

11.10 table(tableName)

以DataFrame的形式返回指定的表。
参数:● tableName – 字符串的表名称

>>> df = sqlContext.read.parquet('/test/parquet_partitioned')
>>> df.registerTempTable('tmpTable')
>>> sqlContext.read.table('tmpTable').dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]

11.11 text(paths)

加载一个文本文件并返回一个名为"value"的单个字符串列的[[DataFrame]]。
文本文件中的每一行都是生成的DataFrame中的新行。
参数:●  paths – 字符串或字符串列表,用于输入路径。

>>> df = sqlContext.read.text('/test/text-test.txt')
>>> df.collect()
[Row(value=u'hello'), Row(value=u'this')]

12. class pyspark.sql.DataFrameWriter(df)

用于将[[DataFrame]]写入外部存储系统(例如文件系统,键值存储等)的接口。使用DataFrame.write()来访问这个。

12.1 format(source)

指定基础输出数据源。
参数:●  source – 字符串,数据源的名称,例如 'json','parquet'。

>>> df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data'))

12.2 insertInto(tableName, overwrite=False)

将DataFrame的内容插入到指定的表中。
它要求DataFrame类的架构与表的架构相同。
可以覆盖任何现有的数据。

12.3 jdbc(url, table, mode=None, properties=None)

通过JDBC将DataFrame的内容保存到外部数据库表中。
注:不要在大型集群上并行创建太多分区; 否则Spark可能会使外部数据库系统崩溃。
参数:● url – 一个形式为jdbc:subprotocol:subname的JDBC URL
   ● table – 外部数据库中表的名称。
           ● mode – 指定数据已经存在时保存操作的行为:
      ● append: 将此DataFrame的内容附加到现有数据。
      ● overwrite: 覆盖现有数据。
      ● ignore: 如果数据已经存在,静默地忽略这个操作。
   ● error (默认): 如果数据已经存在,则抛出异常。
      ● properties – JDBC数据库连接参数,任意字符串标签/值的列表。 通常至少应该包括一个“用户”和“密码”属性。

12.4 json(path, mode=None)

以指定的路径以JSON格式保存DataFrame的内容。
参数:● path – 任何Hadoop支持的文件系统中的路径。
           ● mode –指定数据已经存在时保存操作的行为。
           ● append: 将此DataFrame的内容附加到现有数据。
           ● overwrite: 覆盖现有数据。
           ● ignore: 如果数据已经存在,静默地忽略这个操作。
           ● error (默认): 如果数据已经存在,则抛出异常。

>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.write.json('file:///data/dfjson')
[root@slave1 dfjson]# ll
total 8
-rw-r--r-- 1 root root  0 Nov 24 12:08 part-r-00000-edbd9c5e-87b2-41f4-81ba-cd59c8ca490e
-rw-r--r-- 1 root root 25 Nov 24 12:08 part-r-00001-edbd9c5e-87b2-41f4-81ba-cd59c8ca490e
-rw-r--r-- 1 root root  0 Nov 24 12:08 part-r-00002-edbd9c5e-87b2-41f4-81ba-cd59c8ca490e
-rw-r--r-- 1 root root 23 Nov 24 12:08 part-r-00003-edbd9c5e-87b2-41f4-81ba-cd59c8ca490e
-rw-r--r-- 1 root root  0 Nov 24 12:08 _SUCCESS
[root@slave1 dfjson.json]# cat part*
{"name":"Alice","age":2}
{"name":"Bob","age":5}

12.5 mode(saveMode)

指定数据或表已经存在的行为。
选项包括:
  append: 将此DataFrame的内容附加到现有数据。
  overwrite: 覆盖现有数据。
  error: 如果数据已经存在,则抛出异常。
  ignore: 如果数据已经存在,静默地忽略这个操作。

>>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))

12.6 option(key, value)

 添加一个底层数据源的输出选项。

12.7 options(**options)

添加底层数据源的多个输出选项。

12.8 orc(path, mode=None, partitionBy=None)

以指定的路径以ORC格式保存DataFrame的内容。
注:目前ORC支持只能与HiveContext一起使用。
参数:● path – 任何Hadoop支持的文件系统中的路径。
           ● mode –指定数据已经存在时保存操作的行为:
      append: 将此DataFrame的内容附加到现有数据。
      overwrite: 覆盖现有数据。
      ignore: 如果数据已经存在,静默地忽略这个操作。
      error (默认): 如果数据已经存在,则抛出异常。
           ● partitionBy – 分区列的名称。

>>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
>>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))

12.9 parquet(path, mode=None, partitionBy=None)

将DataFrame的内容以Parquet格式保存在指定的路径中。
参数:● path – 任何Hadoop支持的文件系统中的路径。
           ● mode – 指定数据已经存在时保存操作的行为。
      append: 将此DataFrame的内容附加到现有数据。
      overwrite: 覆盖现有数据。
      ignore: 如果数据已经存在,静默地忽略这个操作。
      error (默认): 如果数据已经存在,则抛出异常。
     ● partitionBy – 分区列的名称。

>>> df.write.parquet("file:///data/dfparquet")

[root@slave1 dfparquet]# ll
total 24
-rw-r--r-- 1 root root 285 Nov 24 12:23 _common_metadata
-rw-r--r-- 1 root root 750 Nov 24 12:23 _metadata
-rw-r--r-- 1 root root 285 Nov 24 12:23 part-r-00000-36364710-b925-4a3a-bd11-b295b6bd7c2e.gz.parquet
-rw-r--r-- 1 root root 534 Nov 24 12:23 part-r-00001-36364710-b925-4a3a-bd11-b295b6bd7c2e.gz.parquet
-rw-r--r-- 1 root root 285 Nov 24 12:23 part-r-00002-36364710-b925-4a3a-bd11-b295b6bd7c2e.gz.parquet
-rw-r--r-- 1 root root 523 Nov 24 12:23 part-r-00003-36364710-b925-4a3a-bd11-b295b6bd7c2e.gz.parquet
-rw-r--r-- 1 root root   0 Nov 24 12:23 _SUCCESS

12.10 partitionBy(*cols)

按文件系统上的给定列对输出进行分区。
如果指定,则输出将在文件系统上进行布局,类似于Hive的分区方案。
参数:● cols – 列的名称.

>>> df.write.partitionBy('year', 'month').parquet(os.path.join(tempfile.mkdtemp(), 'data'))

12.11 save(path=None, format=None, mode=None, partitionBy=None, **options)

将DataFrame的内容保存到数据源。
数据源由format和一组options指定。 如果未指定format,则将使用由spark.sql.sources.default配置的缺省数据源。
参数:● path – Hadoop支持的文件系统中的路径。
           ● format – 用于保存的格式。
           ● mode – 指定数据已经存在时保存操作的行为。
      append: 将此DataFrame的内容附加到现有数据。
      overwrite: 覆盖现有数据。
      ignore: 如果数据已经存在,静默地忽略这个操作。
      error (默认): 如果数据已经存在,则抛出异常。
            ● partitionBy – 分区列的名称。
            ● options – all other string options

>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.write.mode('append').save("file:///data/dfsave")

12.12 saveAsTable(name, format=None, mode=None, partitionBy=None, **options)

将DataFrame的内容保存为指定的表格。
在表已经存在的情况下,这个函数的行为依赖于由mode函数指定的保存模式(默认为抛出异常)。 当模式为覆盖时,[[DataFrame]]的模式不需要与现有表的模式相同。

append: 将此DataFrame的内容附加到现有数据。
overwrite: 覆盖现有数据。
error: 如果数据已经存在,则抛出异常。
ignore: 如果数据已经存在,静默地忽略这个操作。

参数:● name – 表名
           ● format – 用于保存的格式
           ● mode – 追加,覆盖,错误,忽略之一(默认:错误)
           ● partitionBy – 分区列的名称
           ● options – 所有其他字符串选项

12.13 text(path)

将DataFrame的内容保存在指定路径的文本文件中。
DataFrame必须只有一个字符串类型的列。每行成为输出文件中的新行。

免责声明:文章转载自《《Spark Python API 官方文档中文版》 之 pyspark.sql (三)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇arcgis api的三种查询实现为企业服务器配置RAID0、raid1、 raid10、raid5、raid6、等常见RAID下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

[转]SQLserver字符串分割函数

一、按指定符号分割字符串,返回分割后的元素个数,方法很简单,就是看字符串中存在多少个分隔符号,然后再加一,就是要求的结果。 CREATE functionGet_StrArrayLength ( @str varchar(1024), --要分割的字符串 @split varchar(10) --分隔符号 ) returns int as b...

python之读取文件的MD5码并重命名文件

由于自己的手机经常备份,备份后原来的图片视频没有删除,下次再备份的时候移动硬盘上又多了很多重复图片,于是想着能不能用提取MD5校验码的方式识别出重复的文件,然后处理下硬盘里已经重复的内容. 考虑到最近在学python,于是通过上网查资料和内容,借很多大牛的轮子来试了一下,搞了两个晚上,终于算是阶段性完成,能达到目标的方法.主要难点有三个: 获取文件的MD...

python爬虫03:那个叫做 Urllib 的库让我们的 python 假装是浏览器

相信你已经摸清了 浏览器各种请求的套路 也知道了怎么在手机上进行请求和返回数据的抓取 那么接下来我们就开始来使用 python 了 代码 lu 起来 那么 怎么用 python 写各种请求呢? 今天要给大家介绍的就是 Urllib 这可是 python 内置的库 有了它 我们写代码就轻松了 腰也不疼了 腿也不酸了 头发也不秃了 那么怎么使用Urll...

2020系统综合实践 第7次实践作业 11组

目录 1.在树莓派中安装opencv库 1.1 安装依赖 1.2 下载OpenCV源码 1.3 安装pip 1.4 安装Python虚拟机 1.5 编译OpenCV 1.6 安装OpenCV 2.使用opencv和python控制树莓派的摄像头 3.利用树莓派的摄像头实现人脸识别 facerec_on_raspberry_pi.py face...

Python selenium 延时的几种方法

解决网页加载缓慢的几种方法: 一、显性等待WebDriverWait,配合该类的until()和until_not()方法,就能够根据判断条件而进行灵活地等待 1 from selenium import webdriver 2 from selenium.webdriver.support.wait import WebDriverWait 3 4...

Python Web 开发的十个框架【转载】

Python 是一门动态、面向对象语言。其最初就是作为一门面向对象语言设计的,并且在后期又加入了一些更高级的特性。除了语言本身的设计目的之外,Python标准 库也是值得大家称赞的,Python甚至还自带服务器。 其它方面,Python拥有足够多的免费数据函数库、免费的Web网页模板系统、还有与Web服务 器进行交互的库、这些都可以设计到你的Web应用程序...