Spark SQL 编程(七)

摘要:
它表示存储在分区中的不可变集合。它可以并行操作。DataFrame是一个按列分组数据的分布式集合。DataFrame相当于SparkSQL中的关系表。同样的一点是,它们都是为支持分布式计算而设计的。注意:Excutor上的rdd代码大部分是Python代码,只有一小部分是java字节码;SparkSQL在Exciter上运行Java字节码,因此它比rdd具有更好的性能和灵活性!
1. DataFrame 的创建

1.1 RDD 和 DataFrame 的区别

  • RDD 是一种弹性分布式数据集,Spark中的基本抽象。表示一种不可变的、分区储存的集合,可以进行并行操作
  • DataFrame是一种以列对数据进行分组表达的分布式集合, DataFrame等同于Spark SQL中的关系表。相同点是,他们都是为了支持分布式计算而设计

在这里插入图片描述

注意:rddExcutor 上跑的大部分是 Python 代码,只有少部分是 java 字节码;而 SparkSQLExcutor 上跑的全是 Java 字节码,因此其性能要比 rdd 更好,灵活性也更好!

1.1 二元组

# coding=utf-8

from pyspark.sql import SparkSession

session = SparkSession 
    .builder 
    .appName("Python Spark SQL basic example") 
    .config("spark.some.config.option", "some-value") 
    .getOrCreate()

sc = session.sparkContext

# 使用二元组创建DataFrame
a = [('Alice', 1)]
df = session.createDataFrame(a, ['name', 'age'])
df.show()

+-----+---+
| name|age|
+-----+---+
|Alice|  1|
+-----+---+

1.2 键值对

# 键值对
b = [{'name': 'Alice', 'age': 1}]
df = session.createDataFrame(b)
df.show()

1.3 rdd 创建

# rdd 创建
c = [('Alice', 1)]
rdd = sc.parallelize(c)
df = session.createDataFrame(data=rdd, schema=['name', 'age'])
df.show()

1.4 基于 rdd 和 ROW 创建

# 基于rdd和ROW创建DataFrame
from pyspark import Row

d = [('Alice', 1)]
rdd = sc.parallelize(d)
Person = Row("name", "age")
person = rdd.map(lambda r: Person(*r))
df = session.createDataFrame(person)
df.show()

1.5 基于 rdd 和 StructType 创建

# 基于rdd和StructType创建DataFrame

from pyspark.sql.types import StructType, StringType, IntegerType, StructField

e = [('Alice', 1)]
rdd = sc.parallelize(e)
schema = StructType(
    [
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True)
    ]
)
df = session.createDataFrame(rdd, schema)
df.show()

1.6 基于 pandas 创建

# 基于 pandas 创建
import pandas as pd

# 方法一
f = [('Alice', 1)]
df = session.createDataFrame(data=pd.DataFrame(f), schema=['name', 'age'])
df.show()

# 方法二
pdf = pd.DataFrame([("LiLei",18),("HanMeiMei",17)],columns = ["name","age"])
df = spark.createDataFrame(pdf)
df.show()

1.7 从文件读取创建

# person.json
{"name": "rose", "age": 18}
{"name": "lila", "age": 19}

# person.csv
name,age
rose,18
lila, 19

# person.txt
rose
lila

创建方式:

# 从文件读取
df1 = session.read.json('person.json')
df1.show()

df2 = session.read.load('person.json', format='json')
df2.show()

df3 = session.read.csv('person.csv', sep=',', header=True)
df3.show()

# 可从 hdfs 中读取
df4 = session.read.text(paths='person.txt')
df4.show()

运行结果:

+---+----+
|age|name|
+---+----+
| 18|rose|
| 19|lila|
+---+----+

+---+----+
|age|name|
+---+----+
| 18|rose|
| 19|lila|
+---+----+

+----+---+
|name|age|
+----+---+
|rose| 18|
|lila| 19|
+----+---+

+-----+
|value|
+-----+
| rose|
| lila|
+-----+

1.8 从MySQL 数据库读取

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as F


sc = SparkContext("local", appName="mysqltest")
sqlContext = SQLContext(sc)
df = sqlContext.read.format("jdbc").options(
    url="jdbc:mysql://localhost:3306/mydata?user=root&password=mysql&"
        "useUnicode=true&characterEncoding=utf-8&useJDBCCompliantTimezoneShift=true&"
        "useLegacyDatetimeCode=false&serverTimezone=UTC ", dbtable="detail_data").load()
df.show(n=5)
sc.stop()

注意:需要先下载安装:Mysql-connector-java.jar

1.9 toDF方法

rdd = sc.parallelize([("LiLei", 15, 88), ("HanMeiMei", 16, 90), ("DaChui", 17, 60)])
df = rdd.toDF(["name", "age", "score"])
df.show()

1.10 读取hive数据表

session.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
session.sql("LOAD DATA LOCAL INPATH 'data/kv1.txt' INTO TABLE src")
df = spark.sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")
df.show(5)

1.11 读取 parquet 文件

df = session.read.parquet("data/users.parquet")
df.show()

参考文章

2. DataFrame 保存文件

可以保存成 csv 文件、json文件、parquet 文件或者保存成 hive 数据表:

# 保存成 csv 文件
df = session.read.format("json").load("data/people.json")
df.write.format("csv").option("header","true").save("people.csv")

# 先转换成 rdd 再保存成 txt 文件
df.rdd.saveAsTextFile("people.txt")

# 保存成 json 文件
df.write.json("people.json")

# 保存成 parquet 文件, 压缩格式, 占用存储小, 且是 spark 内存中存储格式,加载最快
df.write.partitionBy("age").format("parquet").save("namesAndAges.parquet")
df.write.parquet("people.parquet")

# 保存成 hive 数据表
df.write.bucketBy(42, "name").sortBy("age").saveAsTable("people")
3. 常用 API 操作

3.1 Action 操作

常用 Action 操作包括 show、count、collect、describe、take、head、first 等操作

# coding=utf-8
from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, StructField

session = SparkSession 
    .builder 
    .appName("Python Spark SQL basic example") 
    .config("spark.some.config.option", "some-value") 
    .getOrCreate()

sc = session.sparkContext

info_list = [("rose", 15, 'F'), ("lila", 16, 'F'), ("john", 17, 'M')]
schema = ["name", "age", "gender"]
df = session.createDataFrame(info_list, schema)

df.show(n=2, truncate=True, vertical=False)
print(df.count())

data_list = df.collect()
print(data_list)

print(df.first())	# 返回第一行
print(df.take(2))	# 以Row对象的形式返回DataFrame的前几行
print(df.head(2))	# 返回前 n 行
print(df.describe())	# 探索性数据分析
df.printSchema()	# 以树的格式输出到控制台

运行结果:

# df.show()
+----+---+------+
|name|age|gender|
+----+---+------+
|rose| 15|     F|
|lila| 16|     F|
+----+---+------+
only showing top 2 rows

# df.count()
3

# df.collect()
[Row(name='rose', age=15, gender='F'), Row(name='lila', age=16, gender='F'), Row(name='john', age=17, gender='M')]

# df.first()
Row(name='rose', age=15, gender='F')

# df.take(2)
[Row(name='rose', age=15, gender='F'), Row(name='lila', age=16, gender='F')]

# df.head(2)
[Row(name='rose', age=15, gender='F'), Row(name='lila', age=16, gender='F')]

# df.describe()
DataFrame[summary: string, name: string, age: string, gender: string]

# df.priceSchema()
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)

3.2 类 RDD 操作

DataFrameRDD 之间可以相互转换:

# df ---> rdd
df.rdd

# rdd ---> df
df.toDF(schema)

DataFrame 转换为 rdd 后,一些常用的 rdd 操作也是支持的,比如:distinct、cache、sample、foreach、intersect、except、map、flatMap、filter,但是不够灵活:

# coding=utf-8
from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, StructField
from pyspark.sql.functions import *

session = SparkSession 
    .builder 
    .appName("Python Spark SQL basic example") 
    .config("spark.some.config.option", "some-value") 
    .getOrCreate()

sc = session.sparkContext

# 类 rdd 操作,转换为 rdd,使用 .rdd
data_list = [("Hello World",), ("Hello Python",), ("Hello Spark",)]
schema = ['value']
df = session.createDataFrame(data_list, schema)
df.show()

+------------+
|       value|
+------------+
| Hello World|
|Hello Python|
| Hello Spark|
+------------+

rdd = df.rdd  # 转换为 rdd
print(rdd.collect())  # [Row(value='Hello World'), Row(value='Hello Python'), Row(value='Hello Spark')]

1、map

# 转换为大写,再转换为 df
df_rdd = rdd.map(lambda x: Row(x[0].upper()))
df_rdd.toDF(schema).show()

+------------+
|       value|
+------------+
| HELLO WORLD|
|HELLO PYTHON|
| HELLO SPARK|
+------------+

2、flatMap

# df_flat = rdd.flatMap(lambda x: x[0].split(" "))
# print(df_flat.collect())    # ['Hello', 'World', 'Hello', 'Python', 'Hello', 'Spark']

df_flat = rdd.flatMap(lambda x: x[0].split(" ")).map(lambda x: Row(x)).toDF(schema)
df_flat.show()

+------+
| value|
+------+
| Hello|
| World|
| Hello|
|Python|
| Hello|
| Spark|
+------+

3、filter

# 过滤只有以 Python 结尾的值
df_filter = rdd.filter(lambda x: x[0].endswith('Python'))
print(df_filter.collect())
df_filter.toDF(schema).show()

[Row(value='Hello Python')]

+------------+
|       value|
+------------+
|Hello Python|
+------------+

4、distinct 去重:

df_flat.distinct().show()

+------+
| value|
+------+
| World|
| Hello|
|Python|
| Spark|
+------+

5、cache 缓存:

df.cache()  # 缓存
df.unpersist()  # 去掉缓存

6、sample 抽样:

df_sample = df.sample(False, 0.6, 0)
df_sample.show()

+------------+
|       value|
+------------+
| Hello World|
|Hello Python|
| Hello Spark|
+------------+

7、intersect 交集:

df2 = session.createDataFrame([["Hello World"], ["Hello Scala"], ["Hello Spark"]]).toDF("value")

df_intersect = df.intersect(df2)
df_intersect.show()

+-----------+
|      value|
+-----------+
|Hello Spark|
|Hello World|
+-----------+

8、exceptAll 补集:

# 无补集
df_except = df.exceptAll(df2)
df_except.show()

3.2.1 df 转换 rdd 后 map、flatMap、filter区别

1、rdd.map

def func_1(row):
    print(row)	

    return row[0].upper()

rdd1 = rdd.map(lambda row: func_1(row))
print(rdd1.collect())	# ['HELLO WORLD', 'HELLO PYTHON', 'HELLO SPARK']

# 每一个 row
Row(value='Hello Spark')
Row(value='Hello Python')
Row(value='Hello World')

2、rdd.filter

def func_2(row):
    print(row)

    return row

rdd2 = rdd.filter(lambda row: func_2(row))
print(rdd2.collect())	# [Row(value='Hello World'), Row(value='Hello Python'), Row(value='Hello Spark')]

# 每一个 row
Row(value='Hello Spark')
Row(value='Hello Python')
Row(value='Hello World')

3、rdd.flatMap

def func_3(row):
    print(row)
    print(dir(row))
    print(len(row))	# 1

    return row[0].split(" ")

rdd3 = rdd.flatMap(lambda row: func_3(row))
print(rdd3.collect())	# ['Hello', 'World', 'Hello', 'Python', 'Hello', 'Spark']

# 每一个 row
Row(value='Hello Spark')
Row(value='Hello Python')
Row(value='Hello World')

总结

  • rowRow 对象, 类似于 list,在上面每个 row 只有一个元素
  • filter 后返回的仍然是 Row 对象,而 map、flatMap 却是 Python 对象

3.3 类 SQL操作

sql 操作比类 rdd 操作更为灵活,包括查询 select、selectExpr、where、表连接 join、union、unionAll、表分组 groupby、agg、pivot 等:

# coding=utf-8
from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, StructField
from pyspark.sql.functions import *

session = SparkSession 
    .builder 
    .appName("Python Spark SQL basic example") 
    .config("spark.some.config.option", "some-value") 
    .getOrCreate()

sc = session.sparkContext

info_list = [("rose", 15, 'F'), ("lila", 16, 'F'), ("john", 17, 'M'), ('david', 18, None)]
schema = ["name", "age", "gender"]
df = session.createDataFrame(info_list, schema)

df.show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| rose| 15|     F|
| lila| 16|     F|
| john| 17|     M|
|david| 18|  null|
+-----+---+------+

3.3.1 表查询

常用方法有:select、filter、selectExpr、where

# 选择某列,限制查询条数 limit
df.select('name').limit(2).show()

+----+
|name|
+----+
|rose|
|lila|
+----+

# 选择多列,并对某列进行计算
df.select('name', df['age'] + 1).show()

+-----+---------+
| name|(age + 1)|
+-----+---------+
| rose|       16|
| lila|       17|
| john|       18|
|david|       19|
+-----+---------+

# 选择多列,并对某列进行计算,最后对计算的列名进行重命名
df.select('name', -df['age'] + 2021).toDF('name', 'birthday').show()
+-----+--------+
| name|birthday|
+-----+--------+
| rose|    2006|
| lila|    2005|
| john|    2004|
|david|    2003|
+-----+--------+

# selectExpr 使用 UDF 函数,指定别名
import datetime

session.udf.register('getBirthYear', lambda age: datetime.datetime.now().year - age)
df.selectExpr('name', 'getBirthYear(age) as birth_year', 'UPPER(gender) as gender').show()

+-----+----------+------+
| name|birth_year|gender|
+-----+----------+------+
| rose|      2006|     F|
| lila|      2005|     F|
| john|      2004|     M|
|david|      2003|  null|
+-----+----------+------+

# where 查询条件
df.where('gender="M" and age=17').show()
+----+---+------+
|name|age|gender|
+----+---+------+
|john| 17|     M|
+----+---+------+

# filter 查询
df.filter(df['age'] > 16).show()
df.filter('gender = "M"').show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| john| 17|     M|
|david| 18|  null|
+-----+---+------+

+----+---+------+
|name|age|gender|
+----+---+------+
|john| 17|     M|
+----+---+------+

3.3.2 表连接 join、union

df_score = session.createDataFrame([('john', 'M', 88), ('rose', 'F', 90), ('david', 'M', 50)],
                                   schema=['name', 'gender', 'score'])
df_score.show()

| name|gender|score|
+-----+------+-----+
| john|     M|   88|
| rose|     F|   90|
|david|     M|   50|
+-----+------+-----+

# join 单个字段连接
df.join(df_score.select('name', 'score'), 'name').show()

+-----+---+------+-----+
| name|age|gender|score|
+-----+---+------+-----+
|david| 18|  null|   50|
| john| 17|     M|   88|
| rose| 15|     F|   90|
+-----+---+------+-----+

# join 多个字段连接
df.join(df_score, ['name', 'gender']).show()

+----+------+---+-----+
|name|gender|age|score|
+----+------+---+-----+
|john|     M| 17|   88|
|rose|     F| 15|   90|
+----+------+---+-----+


# 指定连接方式:"inner","left","right","outer","semi","full","leftanti","anti"等
df.join(df_score, ['name', 'gender'], 'right').show()

+-----+------+----+-----+
| name|gender| age|score|
+-----+------+----+-----+
| john|     M|  17|   88|
|david|     M|null|   50|
| rose|     F|  15|   90|
+-----+------+----+-----+

# 灵活指定连接关系
df_mark = df_score.withColumnRenamed('gender', 'sex')
df_mark.show()

+-----+---+-----+
| name|sex|score|
+-----+---+-----+
| john|  M|   88|
| rose|  F|   90|
|david|  M|   50|
+-----+---+-----+

df.join(df_mark, (df['name'] == df_mark['name']) & (df['gender'] == df_mark['sex']), 'inner').show()

+----+---+------+----+---+-----+
|name|age|gender|name|sex|score|
+----+---+------+----+---+-----+
|john| 17|     M|john|  M|   88|
|rose| 15|     F|rose|  F|   90|
+----+---+------+----+---+-----+

# 合并 union
df_student = session.createDataFrame([("Jim", 18, "male"), ("Lily", 16, "female")], schema=["name", "age", "gender"])
df.union(df_student).show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| rose| 15|     F|
| lila| 16|     F|
| john| 17|     M|
|david| 18|  null|
|  Jim| 18|  male|
| Lily| 16|female|
+-----+---+------+

3.3.2 groupBy、agg 分组聚合

# 分组 groupBy
from pyspark.sql import functions as F

# 按性别进行分组,再找出最大值
df.groupBy('gender').max('age').show()

+------+--------+
|gender|max(age)|
+------+--------+
|     F|      16|
|  null|      18|
|     M|      17|
+------+--------+


# 分组后聚合,groupBy、agg  mean 求平均值
df.groupBy('gender').agg(F.mean('age').alias('mean_age'), F.collect_list('name').alias('names')).show()

+------+--------+------------+
|gender|mean_age|       names|
+------+--------+------------+
|     F|    15.5|[rose, lila]|
|  null|    18.0|     [david]|
|     M|    17.0|      [john]|
+------+--------+------------+

df.groupBy('gender').agg(F.expr('avg(age)'), F.expr('collect_list(name)')).show()

+------+--------+------------------+
|gender|avg(age)|collect_list(name)|
+------+--------+------------------+
|     F|    15.5|      [rose, lila]|
|  null|    18.0|           [david]|
|     M|    17.0|            [john]|
+------+--------+------------------+

分组透视:

# 表分组后透视,groupBy,pivot
df_student = session.createDataFrame([("LiLei", 18, "male", 1), ("HanMeiMei", 16, "female", 1),
                                   ("Jim", 17, "male", 2), ("DaChui", 20, "male", 2)]).toDF("name", "age", "gender",
                                                                                            "class")
df_student.show()
df_student.groupBy("class").pivot("gender").max("age").show()

+---------+---+------+-----+
|     name|age|gender|class|
+---------+---+------+-----+
|    LiLei| 18|  male|    1|
|HanMeiMei| 16|female|    1|
|      Jim| 17|  male|    2|
|   DaChui| 20|  male|    2|
+---------+---+------+-----+

+-----+------+----+
|class|female|male|
+-----+------+----+
|    1|    16|  18|
|    2|  null|  20|
+-----+------+----+

窗口函数:

# 窗口函数

df2 = session.createDataFrame([("LiLei", 78, "class1"), ("HanMeiMei", 87, "class1"),
                            ("DaChui", 65, "class2"), ("RuHua", 55, "class2")]) 
    .toDF("name", "score", "class")

df2.show()
dforder = df2.selectExpr("name", "score", "class",
                        "row_number() over (partition by class order by score desc) as order")

dforder.show()

+---------+-----+------+
|     name|score| class|
+---------+-----+------+
|    LiLei|   78|class1|
|HanMeiMei|   87|class1|
|   DaChui|   65|class2|
|    RuHua|   55|class2|
+---------+-----+------+

+---------+-----+------+-----+
|     name|score| class|order|
+---------+-----+------+-----+
|   DaChui|   65|class2|    1|
|    RuHua|   55|class2|    2|
|HanMeiMei|   87|class1|    1|
|    LiLei|   78|class1|    2|
+---------+-----+------+-----+

3.4 类 Excel 操作

包括增加、删除列,替换某些值、去除、填充空行等操作:

info_list = [("rose", 15, 'F'), ("lila", 16, 'F'), ("john", 17, 'M'), ('david', 18, None)]
schema = ["name", "age", "gender"]
df = session.createDataFrame(info_list, schema)

df.show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| rose| 15|     F|
| lila| 16|     F|
| john| 17|     M|
|david| 18|  null|
+-----+---+------+

1、列操作:

# 增加列
df_new = df.withColumn('year', -df['age'] + 2021)
df_new.show()

+-----+---+------+----+
| name|age|gender|year|
+-----+---+------+----+
| rose| 15|     F|2006|
| lila| 16|     F|2005|
| john| 17|     M|2004|
|david| 18|  null|2003|
+-----+---+------+----+

# 更换列顺序
df_new.select('name', 'age', 'year', 'gender').show()

+-----+---+----+------+
| name|age|year|gender|
+-----+---+----+------+
| rose| 15|2006|     F|
| lila| 16|2005|     F|
| john| 17|2004|     M|
|david| 18|2003|  null|
+-----+---+----+------+

# 删除列
df_new.drop('year').show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| rose| 15|     F|
| lila| 16|     F|
| john| 17|     M|
|david| 18|  null|
+-----+---+------+

# 列名重命名
df_new.withColumnRenamed('gender', 'sex').show()

+-----+---+----+----+
| name|age| sex|year|
+-----+---+----+----+
| rose| 15|   F|2006|
| lila| 16|   F|2005|
| john| 17|   M|2004|
|david| 18|null|2003|
+-----+---+----+----+

2、排序 sort

# 排序
df.sort(df['age'].desc()).show()  # asc

+-----+---+------+
| name|age|gender|
+-----+---+------+
|david| 18|  null|
| john| 17|     M|
| lila| 16|     F|
| rose| 15|     F|
+-----+---+------+

# 根据多个字段排序
df.orderBy(df['age'].desc(), df['gender'].desc()).show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
|david| 18|  null|
| john| 17|     M|
| lila| 16|     F|
| rose| 15|     F|
+-----+---+------+

3、去除、填充空行:

# 去除 nan 值行
df.na.drop().show()
# df.dropna().show()

# 填充 nan 值
df.fillna('M').show()
# df.na.fill('M').show()

4、替换:

# 替换某些值
df.na.replace({"": "M", "david": "lisi"}).show()
# df.replace({"": "M", "david": "lisi"}).show()

+----+---+------+
|name|age|gender|
+----+---+------+
|rose| 15|     F|
|lila| 16|     F|
|john| 17|     M|
|lisi| 18|  null|
+----+---+------+

5、去重:

# 去重,默认根据全部字段
df2 = df.unionAll(df)
df2.show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| rose| 15|     F|
| lila| 16|     F|
| john| 17|     M|
|david| 18|  null|
| rose| 15|     F|
| lila| 16|     F|
| john| 17|     M|
|david| 18|  null|
+-----+---+------+

df2.dropDuplicates().show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| john| 17|     M|
|david| 18|  null|
| rose| 15|     F|
| lila| 16|     F|
+-----+---+------+

# 去重,根据部分字段
df.dropDuplicates(['age']).show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| john| 17|     M|
|david| 18|  null|
| rose| 15|     F|
| lila| 16|     F|
+-----+---+------+

6、其他:

# 简单聚合
df.agg({'name': 'count', 'age': 'avg'}).show()

+-----------+--------+
|count(name)|avg(age)|
+-----------+--------+
|          4|    16.5|
+-----------+--------+

# 汇总信息
df.describe().show()

+-------+-----+------------------+------+
|summary| name|               age|gender|
+-------+-----+------------------+------+
|  count|    4|                 4|     3|
|   mean| null|              16.5|  null|
| stddev| null|1.2909944487358056|  null|
|    min|david|                15|     F|
|    max| rose|                18|     M|
+-------+-----+------------------+------+

# 频率超过 0.5 的年龄和性别
df.stat.freqItems(('age', 'gender'), 0.5).show()

+-------------+----------------+
|age_freqItems|gender_freqItems|
+-------------+----------------+
|         [16]|             [F]|
+-------------+----------------+
4. 与 SQL 交互

除了上述常用 api 操作外,还可以将 DataFrame 注册为临时表视图或者全局表视图,然后使用 SQL 语句来操作 DataFrame,另外也可以对 hive 进行增删改查。

4.1 注册视图与 SQL 交互

1、注册为临时视图:

info_list = [("rose", 15, 'F'), ("lila", 16, 'F'), ("john", 17, 'M'), ('david', 18, None)]
schema = ["name", "age", "gender"]
df = session.createDataFrame(info_list, schema)

df.show()

# 注册为临时视图,生命周期与 SparkSession 关联
df.createOrReplaceTempView('people')

session.sql('select * from people WHERE age = "18" ').show()

+-----+---+------+
+-----+---+------+
|david| 18|  null|
+-----+---+------+

2、注册为全局视图:

# 注册为全局视图,生命周期与 Spark 应用关联
df.createOrReplaceGlobalTempView('people1')
session.sql('select t.gender, collect_list(t.name) as names from global_temp.people1 t group by t.gender ').show()

+------+------------+
|gender|       names|
+------+------------+
|     F|[rose, lila]|
|  null|     [david]|
|     M|      [john]|
+------+------------+

session.newSession().sql('select * from global_temp.people1').show()

+-----+---+------+
| name|age|gender|
+-----+---+------+
| rose| 15|     F|
| lila| 16|     F|
| john| 17|     M|
|david| 18|  null|
+-----+---+------+

4.2 与 hive 表交互

https://zhuanlan.zhihu.com/p/94375087

免责声明:文章转载自《Spark SQL 编程(七)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇DevExpressReport使用简要说明sqlserver数据库中的mdf文件太大,表空间分析和表空间释放下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Apache shiro如何实现一个账户同一时刻只有一个人登录

继承AuthorizingRealm类,重写方法doGetAuthenticationInfo /** * 认证(登录时调用) */ @Override protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken to...

Spark python集成

Spark python集成 1、介绍 Spark支持python语言,对于大量的SQL类型的操作,不需要编译,可以直接提交python文件给spark来运行,因此非常简单方便,但是性能要比scala或java慢。对于常规任务,可以使用python来编写,特殊任务还是建议scala编写。 2、使用pyspark启动spark shell(centos) 2...

SqlAlchemy

SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。 Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如: 1 MySQL-Python 2...

Spark 性能相关参数配置详解-任务调度篇

随着Spark的逐渐成熟完善, 越来越多的可配置参数被添加到Spark中来, 本文试图通过阐述这其中部分参数的工作原理和配置思路, 和大家一起探讨一下如何根据实际场合对Spark进行配置优化。 由于篇幅较长,所以在这里分篇组织,如果要看最新完整的网页版内容,可以戳这里:http://spark-config.readthedocs.org/,主要是便于...

Cookie和Session详解

Session和Cookie详解 会话(Session)跟踪是Web程序中常用的技术,用来跟踪用户的整个会话。常用的会话跟踪技术是Cookie与Session。 Cookie通过在客户端记录信息确定用户身份,Session通过在服务器端记录信息确定用户身份。 1.1  Cookie机制   在程序中,会话跟踪是很重要的事情。理论上,一个用户的所有请求操作都...

JAVA中Session

会话状态的维持是开发Web应用所必须面对的问题,有多种方法可以来解决这个问题,如使用Cookies,hidden类型的表单域,或直接把状态信息加到URL中等,还有Servlet本身提供了一个HttpSession接口来支持会话状态的维持,在这里我们主要介绍基于这个接口的会话状态的管理。   Session的发明是为了填补HTTP协议的局限。请注意HTTP协...