Spark 数据读取与保存(输入、输出)

摘要:
SaveAsTextFile(字符串)scala&gt:importsscala.util.parsing.json.json(2)将json文件上载到HDFS[lxl@hadoop102spark]$hadoopfs投入。/示例/src/main/resources/people。json/(3)读取文件scala>
4.数据读取与保存
  Spark 的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。
文件格式分为:Text 文件Json 文件、Csv 文件、Sequence 文件以及 Object 文件;
文件系统分为:本地文件系统、HDFSHBASE 以及数据库。
 
Spark 数据读取与保存(输入、输出)第1张
 
1)数据读取:textFile(String) 
scala> val hdfsFile = sc.textFile("hdfs://hadoop102:9000/fruit.txt")
hdfsFile: org.apache.spark.rdd.RDD[String] = hdfs://hadoop102:9000/fruit.txt MapPartitionsRDD[21] at textFile at <console>:24
 
 
2)数据保存: saveAsTextFile(String)
scala> hdfsFile.saveAsTextFile("/fruitOut")

Spark 数据读取与保存(输入、输出)第2张

 
 

4.1.2 Json 文件 

  如果 JSON 文件中每一行就是一个 JSON 记录,那么可以通过将 JSON 文件当做文本
文件来读取,然后利用相关的 JSON 库对每一条数据进行 JSON 解析。
 
  注意:使用 RDD 读取 JSON 文件处理很复杂,同时 SparkSQL 集成了很好的处理
JSON 文件的方式,所以应用中多是采用 SparkSQL 处理 JSON 文件。 
 
(1)导入解析 json 所需的包 
scala> import scala.util.parsing.json.JSON
(2)上传 json 文件到 HDFS
[lxl@hadoop102 spark]$ hadoop fs -put ./examples/src/main/resources/people.json /
(3)读取文件
scala> val json = sc.textFile("/people.json")
json: org.apache.spark.rdd.RDD[String] = /people.json MapPartitionsRDD[8] at textFile at <console>:24
(4)解析 json 数据
scala> val result = json.map(JSON.parseFull)
result: org.apache.spark.rdd.RDD[Option[Any]] = MapPartitionsRDD[10] at map at <console>:27
(5)打印
scala> result.collect
res11: Array[Option[Any]] = Array(Some(Map(name -> Michael)), Some(Map(name -> Andy, age -> 30.0)), Some(Map(name -> Justin, age -> 19.0)))

4.1.3 Sequence 文件 

  SequenceFile 文件是 Hadoop 用来存储二进制形式的 key-value 对而设计的一种平面
文件(Flat File)。Spark 有专门用来读取 SequenceFile 的接口。在 SparkContext 中,可以
调用 sequenceFile[ keyClass, valueClass](path)。
注意:SequenceFile 文件只针对 PairRDD 
 
(1)创建一个 RDD
scala> val rdd = sc.parallelize(Array((1,2),(3,4),(5,6)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:24
(2)将 RDD 保存为 Sequence 文件
scala> rdd.saveAsSequenceFile("file:///opt/module/spark/seqFile")
(3)查看该文件 
[lxl@hadoop102 seqFile]$ pwd
/opt/module/spark/seqFile
[lxl@hadoop102 seqFile]$ ll 总用量
8 -rw-r--r-- 1 atguigu atguigu 108 109 10:29 part-00000 -rw-r--r-- 1 atguigu atguigu 124 109 10:29 part-00001 -rw-r--r-- 1 atguigu atguigu 0 109 10:29 _SUCCESS
[lxl@hadoop102 seqFile]$
cat part-00000 SEQ org.apache.hadoop.io.IntWritable org.apache.hadoop.io.IntWritableط
(4)读取 Sequence 文件 
scala> val seq = sc.sequenceFile[Int,Int]("file:///opt/module/spark/seqFile")
seq: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[18] at sequenceFile at <console>:24
(5)打印读取后的 Sequence 文件
scala> seq.collect
res14: Array[(Int, Int)] = Array((1,2), (3,4), (5,6))

4.1.4 对象文件 (objectFile)

  对象文件是将对象序列化后保存的文件,采用 Java 的序列化机制。可以通过
objectFile[k,v](path) 函数接收一个路径,读取对象文件,返回对应的 RDD,也可以通过调
用 saveAsObjectFile() 实现对对象文件的输出。因为是序列化所以要指定类型。 
 
(1)创建一个 RDD
scala> val rdd = sc.parallelize(Array(1,2,3,4))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[19] at parallelize at <console>:24
(2)将 RDD 保存为 Object 文件 
scala> rdd.saveAsObjectFile("file:///opt/module/spark/objectFile")
(3)查看该文件
[lxl@hadoop102 object]$ pwd
/opt/module/spark/object

  [lxl@hadoop102 object]$ ll
  总用量 16
  -rw-r--r-- 1 lxl lxl 138 7月 8 03:12 part-00000
  -rw-r--r-- 1 lxl lxl 138 7月 8 03:12 part-00001
  -rw-r--r-- 1 lxl lxl 138 7月 8 03:12 part-00002
  -rw-r--r-- 1 lxl lxl 142 7月 8 03:12 part-00003
  -rw-r--r-- 1 lxl lxl 0 7月 8 03:12 _SUCCESS

  [lxl@hadoop102 object]$ cat part-00000
  SEQ!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritabley.)a�¬촲[IMº`&v겥xp

(4)读取 Object 文件 
scala> val objFile = sc.objectFile[(Int)]("file:///opt/module/spark/objectFile")
objFile: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[31] at objectFile at <console>:24
(5)打印读取后的 Sequence 文件 
scala> objFile.collect
res19: Array[Int] = Array(1, 2, 3, 4)

4.2 文件系统类数据读取与保存 

4.2.1 HDFS 

  Spark 的整个生态系统与 Hadoop 是完全兼容的,所以对于 Hadoop 所支持的文件类型
或者数据库类型,Spark 也同样支持.另外,由于 Hadoop 的 API 有新旧两个版本,所以 Spark 为
了能够兼容 Hadoop 所有的版本,也提供了两套创建操作接口.对于外部存储创建操作而
言,hadoopRDD 和 newHadoopRDD 是最为抽象的两个函数接口,主要包含以下四个参数.
 
  1)输入格式(InputFormat): 制定数据输入的类型,如 TextInputFormat 等,新旧两个版本
所引用的版本分别是 org.apache.hadoop.mapred.InputFormat 和
org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
  2)键类型: 指定[K,V]键值对中 K 的类型
  3)值类型: 指定[K,V]键值对中 V 的类型
  4)分区值: 指定由外部存储生成的 RDD 的 partition 数量的最小值,如果没有指定,系
统会使用默认值 defaultMinSplits
 
注意:其他创建操作的 API 接口都是为了方便最终的 Spark 程序开发者而设置的,是这两个
接口的高效实现版本.例如,对于 textFile 而言,只有 path 这个指定文件路径的参数,其他参数
在系统内部指定了默认值。
 
  1.在 Hadoop 中以压缩形式存储的数据,不需要指定解压方式就能够进行读取,因为
Hadoop 本身有一个解压器会根据压缩文件的后缀推断解压算法进行解压.
  2.如果用 Spark 从 Hadoop 中读取某种类型的数据不知道怎么读取的时候,上网查找一个
使用 map-reduce 的时候是怎么读取这种这种数据的,然后再将对应的读取方式改写成上面的
hadoopRDD 和 newAPIHadoopRDD 两个类就行了 
Spark 数据读取与保存(输入、输出)第3张
 
 
 

4.2.2 MySQL 数据库连接 

支持通过 Java JDBC 访问关系型数据库。需要通过 JdbcRDD 进行,示例如下:
 
(1)添加依赖 
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>
(2)Mysql 读取: 
package com.lxl
import java.sql.DriverManager import org.apache.spark.rdd.JdbcRDD import org.apache.spark.{SparkConf, SparkContext}
object MysqlRDD { def main(args: Array[String]): Unit = {
//1.创建 spark 配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
//2.创建 SparkContext val sc = new SparkContext(sparkConf)
//3.定义连接 mysql 的参数 val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://hadoop102:3306/rdd" val userName = "root" val passWd = "000000"

//创建 JdbcRDD val rdd = new JdbcRDD(sc, () => { Class.forName(driver) DriverManager.getConnection(url, userName, passWd) }, "select * from `rddtable` where `id` >= ? and id <= ?;", 1, 10, 1, r => (r.getInt(1), r.getString(2)) )
//打印最后结果 println(rdd.count()) rdd.foreach(println) sc.stop() } }
Mysql 写入: 
def main(args: Array[String]) {
  val sparkConf = new SparkConf().setMaster("local[2]").setAppName("HBaseApp")
  val sc = new SparkContext(sparkConf)
  val data = sc.parallelize(List("Female", "Male","Female"))
  data.foreachPartition(insertData)
}
def insertData(iterator: Iterator[String]): Unit
= {   Class.forName ("com.mysql.jdbc.Driver").newInstance()   val conn = java.sql.DriverManager.getConnection("jdbc:mysql://master01:3306/rdd", "root","hive")
  iterator.foreach(data
=> {     val ps = conn.prepareStatement("insert into rddtable(name) values (?)")     ps.setString(1, data)     ps.executeUpdate()   }) }

spark-shell 中使用 JDBC 连接 Mysql:

[lxl@hadoop102 spark]$ cp /opt/module/hive/lib/mysql-connector-java-5.1.27-bin.jar ./jars/
scala> val rdd = new org.apache.spark.rdd.JdbcRDD(sc, () => {
     |       Class.forName("com.mysql.jdbc.Driver")
     |       java.sql.DriverManager.getConnection("jdbc:mysql://hadoop102:3306/rdd", "root", "000000") 
     |     },
     |       "select * from `rddtable` where id >= ? and id <= ?;",
     |       1,
     |       10,
     |       1,
     |       r => (r.getInt(1), r.getString(2))
     |     )
rdd: org.apache.spark.rdd.JdbcRDD[(Int, String)] = JdbcRDD[1] at JdbcRDD at <console>:24

scala> println(rdd.count())
3

scala> rdd.foreach(println)
(1,zhangsan)
(2,lisi)
(3,wangwu)

4.2.3 HBase 数据库 

  由于 org.apache.hadoop.hbase.mapreduce.TableInputFormat 类的实现,Spark 可以通过
Hadoop 输入格式访问 HBase。这个输入格式会返回键值对数据,其中键的类型为 org.
apache.hadoop.hbase.io.ImmutableBytesWritable,而值的类型为 org.apache.hadoop.hbase.client.
Result。 
 
(1)添加依赖 
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.3.1</version>
</dependency>
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.1</version> </dependency>
(2)从 HBase 读取数据 
package com.lxl
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.hbase.util.Bytes
object HBaseSpark { def main(args: Array[String]): Unit = {
//创建 spark 配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JdbcRDD")
//创建 SparkContext val sc = new SparkContext(sparkConf)
//构建 HBase 配置信息 val conf: Configuration = HBaseConfiguration.create() conf.set("hbase.zookeeper.quorum", "hadoop102,hadoop103,hadoop104") conf.set(TableInputFormat.INPUT_TABLE, "rddtable")
//从 HBase 读取数据形成 RDD val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD( conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
val count: Long
= hbaseRDD.count() println(count)
//对 hbaseRDD 进行处理 hbaseRDD.foreach { case (_, result) => val key: String = Bytes.toString(result.getRow) val name: String = Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("name"))) val color: String = Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("color"))) println("RowKey:" + key + ",Name:" + name + ",Color:" + color) }
//关闭连接 sc.stop() } }
3)往 HBase 写入 
def main(args: Array[String]) {
//获取 Spark 配置信息并创建与 spark 的连接 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseApp") val sc = new SparkContext(sparkConf)
//创建 HBaseConf val conf = HBaseConfiguration.create() val jobConf = new JobConf(conf) jobConf.setOutputFormat(classOf[TableOutputFormat[ImmutableBytesWritable]]) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "fruit_spark")
//构建 Hbase 表描述器 val fruitTable = TableName.valueOf("fruit_spark") val tableDescr = new HTableDescriptor(fruitTable) tableDescr.addFamily(new HColumnDescriptor("info".getBytes))
//创建 Hbase 表 val admin = new HBaseAdmin(conf) if (admin.tableExists(fruitTable)) { admin.disableTable(fruitTable) admin.deleteTable(fruitTable) } admin.createTable(tableDescr)
//定义往 Hbase 插入数据的方法 def convert(triple: (Int, String, Int)) = { val put = new Put(Bytes.toBytes(triple._1)) put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(triple._2)) put.addImmutable(Bytes.toBytes("info"), Bytes.toBytes("price"), Bytes.toBytes(triple._3)) (new ImmutableBytesWritable, put) }
//创建一个 RDD val initialRDD = sc.parallelize(List((1,"apple",11), (2,"banana",12), (3,"pear",13)))
//将 RDD 内容写到 HBase val localData = initialRDD.map(convert) localData.saveAsHadoopDataset(jobConf) }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

免责声明:文章转载自《Spark 数据读取与保存(输入、输出)》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Systemd简介与使用Github仓库重命名下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

linux下svn命令的使用

1、将文件checkout到本地目录   svn checkout path(path是服务器上的目录)    例如:svn checkout svn://192.168.1.1/pro/domain     简写:svn co 2、往版本库中添加新的文件     svn addfile    例如:svn add test.php(添加test.php)...

数据库(增、删、改、查)

数据库(增、删、改、查) 数据库: 三个层次:文件--服务--界面 (DBMS) 两种登录方式的设置:Windows身份登录;SqlServer身份登录。 如何设置SQLServer身份验证? 1.对象资源管理器右击--属性--安全性--SqlServer和Windows身份登录。 2.对象资源管理器--安全性--登录--sa--右击--属性--常规--...

SSL相关漏洞解决方法

最近用绿盟扫描系统进行内网网系统扫描,有几台设备被扫出了SSL相关漏洞,在此做一个简短的加固方法。 本次涉及漏洞 1.漏洞名称:SSL 3.0 POODLE攻击信息泄露漏洞(CVE-2014-3566)【原理扫描】 2.SSL/TLS 受诫礼(BAR-MITZVAH)攻击漏洞(CVE-2015-2808)【原理扫描】 知识普及1:SSL协议要点 SSL(S...

python socket编程介绍

一、概述 socket 通常被称作“套接字”,用于描述IP地址和端口,是通讯链的句柄,应用程序通过socket向网络发送请求或者回应网络的请求。 socket起源于UNIX,在linux、UNIX中“一切皆文件”,对于文件用 打开、读写、关闭 模式来操作。socket就是该模式的一个实现,是一种特殊的文件。一些socket函数就是对其进行的操作(读IO、写...

appscan 对api的手工检测

AppScan 在 API 安全测试中的实例介绍 在本项目中,API 遵循标准的的 REST 架构和背端服务器进行通信。针对 API 的功能测试由两部分组成:一部分是用一个 Web 的测试页面直接实现的,另一部分,由于 Web 页面的局限性(比如不能任意修改 HTTP header),所以是通过 Shell 脚本调用 curl 实现的。 并且这个 API...

解析ISO8583报文实例

本篇文章参考了中国银联POS终端规范,所以如有不明白的可以去我的资源里面下载。现在我们有ISO8583报文如下(十六进制表示法):60 00 03 00 00 60 31 00 31 07 30 02 00 30 20 04 C0 20 C0 98 11 00 00 00 00 00 00 00 00 01 00 03 49 02 10 00 12 30...