pyspark获取和处理RDD数据

摘要:
弹性分布式数据集是一组不可变的JVM对象,可用于执行高速操作。它是Apache Spark的核心。

弹性分布式数据集(RDD)是一组不可变的JVM对象的分布集,可以用于执行高速运算,它是Apache Spark的核心。

在pyspark中获取和处理RDD数据集的方法如下:

1. 首先是导入库和环境配置(本测试在linux的pycharm上完成)

import os
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession

os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"

conf = SparkConf().setAppName('test_rdd')
sc = SparkContext('local', 'test', conf=conf)
spark = SparkSession(sc)

2. 然后,提供hdfs分区数据的路径或者分区表名

txt_File = r"hdfs://host:port/apps/hive/warehouse/数据库名.db/表名/分区名/part-m-00029.deflate"  # part-m-00029.deflate
# txt_File = r"hdfs://host:port/apps/hive/warehouse/数据库名.db/表名"  # hive table,即也可直接根据表名读取

3. sc.textFile进行读取,得到RDD格式数据<还可以用 spark.sparkContext.parallelize(data) 来获取RDD数据>,参数中还可设置数据被划分的分区数

txt_ = sc.textFile(txt_File) 

4. 基本操作:

type(txt_):显示数据类型,这时属于 'pyspark.rdd.RDD'

pyspark获取和处理RDD数据第1张

txt_.first():获取第一条数据

txt_.take(2):获取前2条数据,形成长度为2的list

txt_.take(2)[1].split('1')[1]:表示获取前两条中的第[1]条数据(也就是第2条,因为python的索引是从0开始的),并以 '1'字符分隔开(这要看你的表用什么作为分隔符的),形成list,再获取该list的第2条数据

txt_.map(lambda x:x.split('1')):使用lambda函数和map函数快速处理每一行数据,这里表示将每一行以 '1'字符分隔开,每一行返回一个list;此时数据结构是:'pyspark.rdd.PipelinedRDD'

txt_.map(lambda x:(x, x.split('1'))).filter(lambda y:y[0].startswith('北京')):表示在返回 (x, x.split('1')) 后,进行筛选filter,获取其中以 '北京' 开头的行,并按照相同格式 (例如,这里是(x, x.split('1'))格式,即原数据+分割后的列表数据) 返回数据

txt_.collect():返回所有RDD数据元素,当数据量很大时谨慎操作

txt_.toDF():不能直接转成DataFrame格式,需要设置Schema

注意:RDD格式不能用show()方法

##

免责声明:文章转载自《pyspark获取和处理RDD数据》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇《Spring源码深度解析》一嵌入式驱动开发之---Linux ALSA音频驱动(一)下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

mysql ,source 导入数据, mysqldump 导出数据,只导表结构

1.mysql 导入数据    mysql  数据库名 -u 用户名 -p 密码< 文件名       如:mysql -u root -p admin < test.sql (如果sql路径不是当前路径,改成绝对路径 mysql -u root -p admin < /home/root/test.sql) 2. source 导入数据...

在阿里云上搭建 Spark 实验平台

之前在自己的笔记本上运行 Python 代码,有些要运行一天多,一关机就前功尽弃,很不方便,所以才有租用阿里云服务器的想法,用了同学租的一台用了两天又觉得不够使,索性就自己租了三台,配置如下,三台一共约 320 块。 CPU:1核 内存:2048 MB 操作系统:Ubuntu 14.04 64位 带宽计费方式:按固定带宽 当前使用带宽:1Mbps 实例规格...

C# 解决串口接收数据不完整

方法1:使 用缓存机制完成。首先通过定义一个成员变量List<byte> buffer = new List<byte> (4096);用来存放所有的数据,在接收函数里,通过buffer.AddRange()方法不断地将接收到的数据加入到buffer中,并同时对 buffer中的数据进行检验,如果达到一定的长度并且校验结果正确(校验...

在workbench中导入.sql文件!(导入数据库文件)

第一步,登陆mysql workbench 第二步,打开自己的数据 ,此处默认(root) 打开数据库后页面 : 第三步,新建一个schema ,随便给个名字,这里起名为test : 可以看到test 内的table ,views,routines,等选项都是没有任何内容的。 第四步 ,在file 下打开你需要导入的.sql 文件。...

[读书笔记] Python 数据分析 (十一)经济和金融数据应用

resample: 重采样函数,可以按照时间来提高或者降低采样频率,fill_method可以使用不同的填充方式。 pandas.data_range 的freq参数枚举: Alias Description B business day frequency C custom business day frequency D calend...

Sql Server 2012 数据库同步方式 (发布、订阅)

上篇中说了通过SQL JOB的方式对数据库的同步,这一节作为上一节的延续介绍通过发布订阅的方式实现数据库之间的同步操作。发布订阅份为两个步骤:1、发布。2订阅。首先在数据源数据库服务器上对需要同步的数据进行发布,然后在目标数据库服务器上对上述发布进行订阅。发布可以发布一张表的部分数据,也可以对整张表进行发布。下面分别介绍发布、订阅的过程。   1、发布。发...