Spark(十二)【SparkSql中数据读取和保存】

摘要:
I Reading and Saving Description SparkSQL提供了一种保存数据和加载数据的常规方式,以及一种特殊的读取方式:常规和特殊保存和保存有四种模式:默认:错误:输出目录存在时报告错误追加:将覆盖追加到输出目录:覆盖忽略:忽略,不写两种。数据格式1.ParquetSparkSQL的默认数据源是Parquet格式。当数据源是Parquet文件时,SparkSQL可以轻松地执行所有操作而不使用格式。读取valdf=火花。read.load保存//读取json文件格式vardf=spark。read.json//将其保存为请求格式df.write.mode.save2.JsonSparkSQL可以自动推断json数据集的结构并将其加载为数据集[Row]。您可以通过SparkSession加载JSON文件。阅读json()。

一. 读取和保存说明

SparkSQL提供了通用的保存数据和数据加载的方式,还提供了专用的方式

读取:通用和专用

保存

保存有四种模式:
  默认: error :  输出目录存在就报错
        append:  向输出目录追加
        overwrite : 覆盖写
        ignore:  忽略,不写

二. 数据格式

1. Parquet

Spark SQL的默认数据源为Parquet格式。Parquet是一种能够有效存储嵌套数据的列式存储格式。

数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作,不需要使用format。修改配置项spark.sql.sources.default,可修改默认数据源格式。

读取

val df = spark.read.load("examples/src/main/resources/users.parquet")

保存

//读取json文件格式
var df = spark.read.json("/opt/module/data/input/people.json")
//保存为parquet格式
df.write.mode("append").save("/opt/module/data/output")

2. Json

Spark SQL 能够自动推测JSON数据集的结构,并将它加载为一个Dataset[Row]. 可以通过SparkSession.read.json()去加载JSON 文件。

注意:Spark读取的JSON文件不是传统的JSON文件,每一行都应该是一个JSON串。

数据格式:employees.json

{"name":"Michael"}
{"name":"Andy", "age":30}

1)导入隐式转换

import spark.implicits._

2)读取Json文件

//专用的读取
val df1: DataFrame = sparkSession.read.json("input/employees.json")
//通用读取
val df: DataFrame = sparkSession.read.format("json").load("input/employees.json")

3)保存为Json文件

    //导隐式包,转为DataSet
    import sparkSession.implicits.
    val ds: Dataset[Emp] = rdd.toDS()
    ds.write.mode("overwrite")json("output/emp.json")

3. CSV

CSV: 逗号作为字段分割符的文件
tsv: ,tab作为字段分割符的文件

读取

    // 通用的读取
    val df: DataFrame = sparkSession.read.format("csv").load("input/person.csv")
    // 专用的读
    val df1: DataFrame = sparkSession.read.csv("input/person.csv")

保存

CSV的参数可以到DataFrameReader 609行查看

//DataFrame
df1.write.option("sep",",").mode("overwrite").csv("output/csv")

4. Mysql

读取

    val props = new Properties()
    /*
        JDBC中能写什么参数,参考 JDBCOptions 223行
     */
    props.put("user","root")
    props.put("password","root")
    //库名
    val df: DataFrame = sparkSession.read.jdbc("jdbc:mysql://localhost:3306/spark_test", "tbl_user", props)
    // 全表查询    只显示前N条
    df.show()
    //指定查询
    df.createTempView("user")
    sparkSession.sql("select * from user where id > 5").show()

保存

    val list = List(Emp("jack", 2222.22), Emp("jack1", 3222.22), Emp("jack2", 4222.22))
    val rdd: RDD[Emp] = sparkSession.sparkContext.makeRDD(list, 1)
    //导入隐式包
    import sparkSession.implicits._
    val ds: Dataset[Emp] = rdd.toDS()
    val props = new Properties()
    props.put("user","root")
    props.put("password","root")
    //  表名可以是已经存在的表t1,也可以是一张新表t1(用的多)  
    
    //专用的写
    ds.write.jdbc("jdbc:mysql://localhost:3306/0508","t1",props)


    // 通用的写
    ds.write.
      option("url","jdbc:mysql://localhost:3306/库名")
		//表名
      .option("dbtable","t2")
      .option("user","root")
      .option("password","root")
      .mode("append")
      .format("jdbc").save()

免责声明:文章转载自《Spark(十二)【SparkSql中数据读取和保存】》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇jemter上传到服务器进行压测记一次数据库的优化下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Spark资源管理

Spark资源管理 1、介绍 Spark资源管控分为spark集群自身可支配资源配置和job所用资源配置。 2、spark集群支配资源控制 在spark的conf/spark-env.sh文件中可以指定master和worker的支配资源数。 2.1 Spark集群可支配资源配置 每个worker使用内核数 # 每个worker使用的内核数,默认是所有...

pyspark mongodb yarn

from pyspark.sql import SparkSessionmy_spark = SparkSession .builder .appName("myApp") .config("spark.mongodb.input.uri", "mongodb://pyspark_admin:admin123@192.168.2.5...

大数据--Spark原理

Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一,与Hadoop和Storm等其他大数据和MapReduce技术相比,Spark有如下优势: 1.运行速度快,Spark拥有DAG执行引擎,支持在内存中对数据进行迭代计算。官方提供...

Spark启动流程(Standalone)-分析

1、start-all.sh脚本,实际上执行java -cp Master 和 java -cp Worker 2、Master 启动时首先穿件一个RpcEnv对象,负责管理所有通信逻辑 3、Master 通信RpcEnv对象创建一个Endpoint,Master就是一个Endpoint,Worker可以与其进行通信 4、Worker启动时也是创建一个R...

大数据技术-spark+hive+hbase研究

大数据 spark 研究(0基础入门)一 背景 1 基础 Scala 语言基础:Scala详细总结(精辟版++) spark 介绍    :  spark介绍     二 环境 1 部署spark   <![if !supportLists]>1、<![endif]>环境准备(1)配套软件版本要求: Java 6+  Python...

scala之 spark连接SQL和HIVE/IDEA操作HDFS

一、连接SQL 方法一、 package com.njbdqn.linkSql import java.util.Properties import org.apache.spark.sql.SparkSession import org.apache.spark.sql._ object LinkSql { def main(args: Arr...