spark保存读取csv SequenceFile

摘要:
importorg.apache.spark.api.java.function.function;importjava.util.Arrays;

前言

Spark读取和保存文件格式是非常多的,json,csv,haoop SequenceFile ,hbase等等。本文就是简单的spark读取文件

spark 读写csv

使用opencsv jar包读取,先在maven配置。
读取方式因逐行读取、以单个文件为key读取整个文件,代码实现略有不同

逐行读取

package com.learn.hadoop.spark.doc.analysis.chpater.datasave;

import com.opencsv.CSVReader;
import com.opencsv.CSVWriter;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.io.StringReader;
import java.io.StringWriter;
import java.util.Arrays;

public class DataSaveTest02Csv {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("DataSaveTest02");
        JavaSparkContext sc  = new JavaSparkContext(conf);
        String inputfile = "D:\my\code\github\learncode\spark\src\main\resources\res\sparksave\";
        //读取文件或者文件夹下所有文件,以每行记录读取
        JavaRDD<String> rdd = sc.textFile(inputfile);
        //打印读取的内容,打印出来的是所有行的string
        System.out.println(rdd.collect().toString());
        //JavaRDD<String[]> csvData =rdd.map(new ParseLine2());
        JavaRDD<String[]> csvData =rdd.map(new Function<String, String[]>() {
            @Override
            public String[] call(String s) throws Exception {
                CSVReader reader = new CSVReader(new StringReader(s));
                return reader.readNext();
            }
        });
        //输出csv 每行的数组
        csvData.foreach(f-> System.out.println(Arrays.asList(f).toString()));

        //test write
        String outfile ="C:\Users\juncai\Desktop\out";
        //创建一个JavaRDD<String []>,直接就赋值
        JavaRDD<String []> outrdd =csvData;
        //一行一行的去存
        outrdd.map(new Function<String[],String>(){
            @Override
            public String call(String[] strings) throws Exception {
                StringWriter stringWriter = new StringWriter();
                CSVWriter csvWriter = new CSVWriter(stringWriter);
                csvWriter.writeNext(strings);
                return stringWriter.toString();
            }
        }).saveAsTextFile(outfile);
    }
}
/*
目录下两个文件,相同的内容
    1,jack,male,29
    2,linda,female,29

输出
    [1,jack,male,29, 2,linda,female,29, 1,jack,male,29, 2,linda,female,29]
    [1, jack, male, 29]
    [2, linda, female, 29]
    [1, jack, male, 29]
    [2, linda, female, 29]
 */

单个文件为key读取整个文件

textFiles与wholeTextFiles方法区别就是,wholeTextFiles文件为key读取整个文件,是键值对的输出。
可以看下输出读取文件的时候的输出的差别

package com.learn.hadoop.spark.doc.analysis.chpater.datasave;

import com.opencsv.CSVReader;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import scala.Tuple2;

import java.io.StringReader;
import java.util.Arrays;
import java.util.Iterator;

/**
 * 测试spark 数据保存和读取
 * 读取csv文件
 */
public class DataSaveTest01Csv {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("DataSaveTest01");
        JavaSparkContext sc  = new JavaSparkContext(conf);
        //wholeTextFiles输出整个目录的文件,每个文件就是一个记录,不是按照行读取
        String inputfile = "D:\my\code\github\learncode\spark\src\main\resources\res\sparksave\";
        //读取文件或者目录下的数据,以文件为一个单独的记录读取
        JavaPairRDD<String,String> csvData =sc.wholeTextFiles(inputfile);
        //打印读取的文件与文件内容键值对
        System.out.println(csvData.collect());
        //JavaRDD<String []>keyedRdd =csvData.flatMap(new ParseLine());
        //只输出文件中的内容,不作其他的处理
        JavaRDD<String []>keyedRdd =csvData.flatMap(new FlatMapFunction<Tuple2<String, String>, String[]>() {
            @Override
            public Iterator<String[]> call(Tuple2<String, String> stringStringTuple2) throws Exception {
                CSVReader reader = new CSVReader(new StringReader(stringStringTuple2._2));
                return reader.readAll().iterator();
            }
        });
        //keyedRdd.foreach(x -> System.out.println(x);输出的是对象
        keyedRdd.foreach(x -> System.out.println(Arrays.asList(x).toString()));
    }

}
/*
目录下两个文件,相同的内容
    1,jack,male,29
    2,linda,female,29

输出
    [(file:/D:/my/code/github/learncode/spark/src/main/resources/res/sparksave/datasave - 副本.csv,1,jack,male,29
    2,linda,female,29
    ), (file:/D:/my/code/github/learncode/spark/src/main/resources/res/sparksave/datasave.csv,1,jack,male,29
    2,linda,female,29
    )]
    [1, jack, male, 29]
    [2, linda, female, 29]
    [1, jack, male, 29]
    [2, linda, female, 29]
 */

Sequence的读写

SequenceFile 是由没有相对关系结构的键值对文件组成的常用 Hadoop 格式。SequenceFile 也是Hadoop MapReduce 作业中常用的输入输出格式,
所以如果你在使用一个已有的 Hadoop 系统,数据很有可能是以 SequenceFile 的格式供你使用的。由于 Hadoop 使用了一套自定义的序列化框架,
因此 SequenceFile 是由实现 Hadoop 的 Writable接口的元素组成。

package com.learn.hadoop.spark.doc.analysis.chpater.datasave;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileAsBinaryOutputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;

public class DataSaveTest03Sequence {
    public static void main(String[] args) {
        SparkConf conf =new SparkConf().setMaster("local").setAppName("DataSaveTest03Sequence");
        JavaSparkContext sc = new JavaSparkContext(conf);
        //序列化键值对
        JavaPairRDD<String,Integer> rdd = sc.parallelizePairs(Arrays.asList(new Tuple2<String,Integer>("string one",1),
                new Tuple2<String,Integer>("string two",2)),1);

        //返回SequenceFile所支持的格式的键值对
        JavaPairRDD<Text,IntWritable>  result = rdd.mapToPair(new PairFunction<Tuple2<String, Integer>, Text, IntWritable>() {
            @Override
            public Tuple2<Text, IntWritable> call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return new Tuple2<Text, IntWritable>(new Text(stringIntegerTuple2._1),new IntWritable(stringIntegerTuple2._2));
            }
        });
        //输出键值对
        result.saveAsHadoopFile("C:\Users\juncai\Desktop\out", Text.class,
                IntWritable.class, SequenceFileOutputFormat.class);

        //test read
        String filepath = "D:\my\code\github\learncode\spark\src\main\resources\res\saprksavesequence\part-00000";
        //SequenceFile是键值对的hadoop文件
        //直接读取hadoop文件,转化为hadoop键值对
        JavaPairRDD<Text,IntWritable> input = sc.sequenceFile(filepath,Text.class,IntWritable.class,1);
        input.foreach(f-> System.out.println(f.toString()));
        //转为普通的键值对。maiToPair是键值对转换函数
        JavaPairRDD<String ,Integer> outRdd = input.mapToPair(new PairFunction<Tuple2<Text, IntWritable>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Text, IntWritable> textIntWritableTuple2) throws Exception {
                return new Tuple2<String, Integer>(textIntWritableTuple2._1.toString(),textIntWritableTuple2._2.get());
            }
        });
        outRdd.foreach(f-> System.out.println(f.toString()));
    }
}

免责声明:文章转载自《spark保存读取csv SequenceFile》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇matlab练习程序(动感模糊)pyqt 实现左列表向右列表添加下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Golang的优雅重启

更新(2015年4月):Florian von Bock已将本文中描述的内容转换为一个名为endless的优秀Go包 。 如果您有Golang HTTP服务,可能需要重新启动它以升级二进制文件或更改某些配置。如果你(像我一样)因为网络服务器处理它而优雅地重新启动是理所当然的,你可能会发现这个配方非常方便,因为使用Golang你需要自己动手。 实际上这里有...

App 抓包提示网络异常怎么破?(抓包HTTPS)

  背景 当你测试App的时候,想要通过Fiddler/Charles等工具抓包看下https请求的数据情况,发现大部分的App都提示网络异常/无数据等等信息。以“贝壳找房”为例: Fiddler中看到的请求是这样的: 你可能开始找证书的问题:是不是Fiddler/Charles的证书没有导入的手机中去?配置一遍又一遍,又开始对比web端浏览器的ht...

js上传

js上传目前有很多的方法,有控件,有自定义的等等 下面为在项目中用到的一个自定义的上传,不依赖任何的控件 ///上传触发事件 function StartLoadCAD() {     var fileupload = document.getElementById('filePro').files;     for (var i = 0; i &l...

FutureTask详解

1 基本概念 1.1 Callable与Future Runnable封装一个异步运行的任务,可以把它想象成为一个没有参数和返回值的异步方法。Callable与Runnable类似,但是有返回值。Callable接口是一个参数化的类型,只有一个方法call。 public interface Callable<V> { V call() th...

munge源码编译

1. 下载安装munge 创建普通帐号munge,用于运行munged 下载源码:https://github.com/dun/munge/releases/tag/munge-0.5.14 解压:tar -xvf munge-0.5.14.tar.xz 进入目录:cd munge-0.5.14 创建安装目录:mkdir -p /usr/local/hpc...

IE6中location不跳转问题

前天一我遇到个看似很诡异的问题,就是<a href="javascript:void(0);" onclick="window.location.href=url"></a>在IE6下面没反应,不跳转到onclik事件中的"window.location.href"。 当时我们在网上找了篇文章很快就解决了,但是文章中没有说明具体原因在...