PySpark DataFrame 添加自增 ID

摘要:
此函数未在DataFrame API中实现,因此只能以其他方式实现,或者转换为RDD,然后使用RDD的zipWithIndex运算符实现。从pyspark创建DataFrame对象。sqllimportSarkSessionpark=火花会话。建设者getOrCreate()df=火花。createDataFramedf Show()输出:+----+----+|age|name|+----+---+|18|Alice||22|Sitoi||22| Shitao||7|Tom||17|De||45|Apple|+----+方法1:单调递增_Id()使用内置函数单调递增_IID()。因为Spark将有分区,所以生成的ID必须单调增加且唯一,但不能连续。df=df.withColumndf.Show()输出:+-------+-------+----------+|age|name|id|+-------+--------+|18|Alice|8589354592||22|Sitoi|17179869184||22| Shitao|25769803776||7|Tom|42949672960||17|De|5153960752||45|Apple|60129542144|+-------+---------+如果读取本地单个CSV文件或JSON文件,则id将不断增加且唯一。
PySpark DataFrame 添加自增 ID

本文原始地址:https://sitoi.cn/posts/62634.html

在用 Spark 处理数据的时候,经常需要给全量数据增加一列自增 ID 序号,在存入数据库的时候,自增 ID 也常常是一个很关键的要素。
在 DataFrame 的 API 中没有实现这一功能,所以只能通过其他方式实现,或者转成 RDD 再用 RDD 的 zipWithIndex 算子实现。
下面呢就介绍三种实现方式。

创建 DataFrame 对象

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
    [
        {"name": "Alice", "age": 18},
        {"name": "Sitoi", "age": 22},
        {"name": "Shitao", "age": 22},
        {"name": "Tom", "age": 7},
        {"name": "De", "age": 17},
        {"name": "Apple", "age": 45}
    ]
)
df.show()

输出:

+---+------+
|age|  name|
+---+------+
| 18| Alice|
| 22| Sitoi|
| 22|Shitao|
|  7|   Tom|
| 17|    De|
| 45| Apple|
+---+------+

方式一:monotonically_increasing_id()

使用自带函数 monotonically_increasing_id() 创建,由于 spark 会有分区,所以生成的 ID 保证单调增加且唯一,但不是连续的

优点:对于没有分区的文件,处理速度快。
缺点:由于 spark 的分区,会导致,ID 不是连续增加。

df = df.withColumn("id", monotonically_increasing_id())
df.show()

输出:

+---+------+-----------+
|age|  name|         id|
+---+------+-----------+
| 18| Alice| 8589934592|
| 22| Sitoi|17179869184|
| 22|Shitao|25769803776|
|  7|   Tom|42949672960|
| 17|    De|51539607552|
| 45| Apple|60129542144|
+---+------+-----------+

如果读取本地的单个 CSV 文件 或 JSON 文件,ID 会是连续增加且唯一的。

方法二:窗口函数

利用窗口函数:设置窗口函数的分区以及排序,因为是全局排序而不是分组排序,所有分区依据为空,排序规则没有特殊要求也可以随意填写

优点:保证 ID 连续增加且唯一
缺点:运行速度满,并且数据量过大会爆内存,需要排序,会改变原始数据顺序。

from pyspark.sql.functions import row_number

spec = Window.partitionBy().orderBy("age")
df = df.withColumn("id", row_number().over(spec))
df.show()

输出:

+---+------+---+
|age|  name| id|
+---+------+---+
|  7|   Tom|  1|
| 17|    De|  2|
| 18| Alice|  3|
| 22| Sitoi|  4|
| 22|Shitao|  5|
| 45| Apple|  6|
+---+------+---+

方法三:RDD 的 zipWithIndex 算子

转成 RDD 再用 RDD 的 zipWithIndex 算子实现

优点:保证 ID 连续 增加且唯一。
缺点:运行速度慢。

from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import StructField, LongType

spark = SparkSession.builder.getOrCreate()

schema = df.schema.add(StructField("id", LongType()))
rdd = df.rdd.zipWithIndex()


def flat(l):
    for k in l:
        if not isinstance(k, (list, tuple)):
            yield k
        else:
            yield from flat(k)


rdd = rdd.map(lambda x: list(flat(x)))
df = spark.createDataFrame(rdd, schema)
df.show()

输出:

+---+------+---+
|age|  name| id|
+---+------+---+
| 18| Alice|  0|
| 22| Sitoi|  1|
| 22|Shitao|  2|
|  7|   Tom|  3|
| 17|    De|  4|
| 45| Apple|  5|
+---+------+---+

免责声明:文章转载自《PySpark DataFrame 添加自增 ID》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇php FPDF类库应用实现代码cx_Oracle连接oracle数据库下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

DataFrame和python中数据结构互相转换

楔子 有时候DataFrame,我们不一定要保存成文件、或者入数据库,而是希望保存成其它的格式,比如字典、列表、json等等。当然,读取DataFrame也不一定非要从文件、或者数据库,根据现有的数据生成DataFrame也是可以的,那么该怎么做呢?我们来看一下 DataFrame转成python中的数据格式 转成json DataFrame转成json,...

Spark操作dataFrame进行写入mysql,自定义sql的方式

业务场景:   现在项目中需要通过对spark对原始数据进行计算,然后将计算结果写入到mysql中,但是在写入的时候有个限制:   1、mysql中的目标表事先已经存在,并且当中存在主键,自增长的键id   2、在进行将dataFrame写入表的时候,id字段不允许手动写入,因为其实自增长的 要求:   1、写入数据库的时候,需要指定字段写入,也就是说,只...

《Spark Python API 官方文档中文版》 之 pyspark.sql (三)

摘要:在Spark开发中,由于需要用Python实现,发现API与Scala的略有不同,而Python API的中文资料相对很少。每次去查英文版API的说明相对比较慢,还是中文版比较容易get到所需,所以利用闲暇之余将官方文档翻译为中文版,并亲测Demo的代码。在此记录一下,希望对那些对Spark感兴趣和从事大数据开发的人员提供有价值的中文资料,对PyS...

统计学习方法 | 第2章 感知机 | 补充

egend函数 legend函数介绍:在轴上方一个图例 legend常用属性: loc:图例摆放的位置,值请看下方文档 ncol; 图例列数,它的值决定了图例一共有多少列,详细请看下方文档 label:如果需要完全控制图例中的内容可以不写plot中的label,写在legend中,请看图中示例 sklearn的datasets使用 sklearn.dat...

pyspark 通过 json 字符串 创建DataFrame

1、开发环境 python版本:3.6 spark版本:2.3.1 pyspark:2.3.1 2、脚本 from pyspark import SparkConf,SparkContextfrom pyspark.sql import SQLContext,HiveContextfrom pyspark.sql.types import *####1、...

pandas重塑层次化索引(stack()和unstack()函数解析)

在数据处理时,有时需要对数据的结构进行重排,也称作是重塑(Reshape)或者轴向旋转(Pivot)。而运用层次化索引可为 DataFrame 的数据重排提供良好的一致性。在 pandas 中提供了实现重塑的两个函数,即 stack() 函数和 unstack() 函数。常见的数据层次化结构有两种,一种是表格,如图 1 所示;另一种是“花括号”,如图 2...