MapReduce实战(二)自定义类型排序

摘要:
需求:基于上一道题,我想将结果按照总流量的大小由大到小输出。

需求:

基于上一道题,我想将结果按照总流量的大小由大到小输出。

思考:

默认mapreduce是对key字符串按照字母进行排序的,而我们想任意排序,只需要把key设成一个类,再对该类写一个compareTo(大于要比较对象返回1,等于返回0,小于返回-1)方法就可以了。

注:这里如果是实现java.lang.Comparable接口,最终报错,还是直接实现WritableComparable吧。

FlowBean.java更改如下:

packagecn.darrenchan.hadoop.mr.flow;
importjava.io.DataInput;
importjava.io.DataOutput;
importjava.io.IOException;
importorg.apache.hadoop.io.Writable;
importorg.apache.hadoop.io.WritableComparable;
public class FlowBean implements WritableComparable<FlowBean>{
    private String phoneNum;//手机号
    private long upFlow;//上行流量
    private long downFlow;//下行流量
    private long sumFlow;//总流量
    publicFlowBean() {
        super();
    }
    public FlowBean(String phoneNum, long upFlow, longdownFlow) {
        super();
        this.phoneNum =phoneNum;
        this.upFlow =upFlow;
        this.downFlow =downFlow;
        this.sumFlow = upFlow +downFlow;
    }
    publicString getPhoneNum() {
        returnphoneNum;
    }
    public voidsetPhoneNum(String phoneNum) {
        this.phoneNum =phoneNum;
    }
    public longgetUpFlow() {
        returnupFlow;
    }
    public void setUpFlow(longupFlow) {
        this.upFlow =upFlow;
    }
    public longgetDownFlow() {
        returndownFlow;
    }
    public void setDownFlow(longdownFlow) {
        this.downFlow =downFlow;
    }
    public longgetSumFlow() {
        returnsumFlow;
    }
    public void setSumFlow(longsumFlow) {
        this.sumFlow =sumFlow;
    }
    @Override
    publicString toString() {
        return upFlow + "	" + downFlow + "	" +sumFlow;
    }
    //从数据流中反序列出对象的数据
    //从数据流中读出对象字段时,必须跟序列化时的顺序保持一致
@Override
    public void readFields(DataInput in) throwsIOException {
        phoneNum =in.readUTF();
        upFlow =in.readLong();
        downFlow =in.readLong();
        sumFlow =in.readLong();
    }
    //将对象数据序列化到流中
@Override
    public void write(DataOutput out) throwsIOException {
        out.writeUTF(phoneNum);
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }
    @Override
    public intcompareTo(FlowBean flowBean) {
        return sumFlow > flowBean.getSumFlow() ? -1 : 1;
    }
}

建立文件SortMR.java:

packagecn.darrenchan.hadoop.mr.flowsort;
importjava.io.IOException;
importorg.apache.commons.io.output.NullWriter;
importorg.apache.commons.lang.StringUtils;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.NullWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
importcn.darrenchan.hadoop.mr.flow.FlowBean;
//执行命令:hadoop jar flowsort.jar cn.darrenchan.hadoop.mr.flowsort.SortMR /flow/output /flow/outputsort
public classSortMR {
    public static class SortMapper extends
            Mapper<LongWritable, Text, FlowBean, NullWritable>{
        //拿到一行数据,切分出各字段,封装为一个flowbean,作为key输出
@Override
        protected voidmap(LongWritable key, Text value, Context context)
                throwsIOException, InterruptedException {
            String line =value.toString();
            String[] words = StringUtils.split(line, "	");
            String phoneNum = words[0];
            long upFlow = Long.parseLong(words[1]);
            long downFlow = Long.parseLong(words[2]);
            context.write(newFlowBean(phoneNum, upFlow, downFlow),
                    NullWritable.get());
        }
    }
    public static class SortReducer extends
            Reducer<FlowBean, NullWritable, Text, FlowBean>{
        @Override
        protected void reduce(FlowBean key, Iterable<NullWritable>values,
                Context context) throwsIOException, InterruptedException {
            String phoneNum =key.getPhoneNum();
            context.write(newText(phoneNum), key);
        }
    }
    public static void main(String[] args) throwsException {
        Configuration conf = newConfiguration();
        Job job =Job.getInstance(conf);
        job.setJarByClass(SortMR.class);
        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

我们现在处理的结果是上一次实验的输出结果,打成jar包flowsort.jar,执行命令:

hadoop jar flowsort.jar cn.darrenchan.hadoop.mr.flowsort.SortMR /flow/output /flow/outputsort

得到的处理信息如下:

17/02/26 05:22:36 INFO client.RMProxy: Connecting to ResourceManager at weekend110/192.168.230.134:8032
17/02/26 05:22:36 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/02/26 05:22:36 INFO input.FileInputFormat: Total input paths to process : 1
17/02/26 05:22:36 INFO mapreduce.JobSubmitter: number of splits:1
17/02/26 05:22:37 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1488112052214_0003
17/02/26 05:22:37 INFO impl.YarnClientImpl: Submitted application application_1488112052214_0003
17/02/26 05:22:37 INFO mapreduce.Job: The url to track the job: http://weekend110:8088/proxy/application_1488112052214_0003/
17/02/26 05:22:37 INFO mapreduce.Job: Running job: job_1488112052214_0003
17/02/26 05:24:16 INFO mapreduce.Job: Job job_1488112052214_0003 running in uber mode : false
17/02/26 05:24:16 INFO mapreduce.Job: map 0% reduce 0%
17/02/26 05:24:22 INFO mapreduce.Job: map 100% reduce 0%
17/02/26 05:24:28 INFO mapreduce.Job: map 100% reduce 100%
17/02/26 05:24:28 INFO mapreduce.Job: Job job_1488112052214_0003 completed successfully
17/02/26 05:24:28 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=933
FILE: Number of bytes written=187799
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=735
HDFS: Number of bytes written=623
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=3077
Total time spent by all reduces in occupied slots (ms)=2350
Total time spent by all map tasks (ms)=3077
Total time spent by all reduce tasks (ms)=2350
Total vcore-seconds taken by all map tasks=3077
Total vcore-seconds taken by all reduce tasks=2350
Total megabyte-seconds taken by all map tasks=3150848
Total megabyte-seconds taken by all reduce tasks=2406400
Map-Reduce Framework
Map input records=22
Map output records=22
Map output bytes=883
Map output materialized bytes=933
Input split bytes=112
Combine input records=0
Combine output records=0
Reduce input groups=22
Reduce shuffle bytes=933
Reduce input records=22
Reduce output records=22
Spilled Records=44
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=142
CPU time spent (ms)=1280
Physical memory (bytes) snapshot=218406912
Virtual memory (bytes) snapshot=726446080
Total committed heap usage (bytes)=137433088
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=623
File Output Format Counters
Bytes Written=623

最终结果如下,可以看到是排序好的。

1363157985069 186852 200 187052
1363157985066 2481 24681 27162
1363157990043 63 11058 11121
1363157986072 18 9531 9549
1363157982040 102 7335 7437
1363157984041 9 6960 6969
1363157995093 3008 3720 6728
1363157995074 4116 1432 5548
1363157992093 4938 200 5138
1363157973098 27 3659 3686
1363157995033 20 3156 3176
1363157984040 12 1938 1950
1363157986029 3 1938 1941
1363157991076 1512 200 1712
1363157993044 12 1527 1539
1363157993055 954 200 1154
1363157985079 180 200 380
1363157986041 180 200 380
1363157988072 120 200 320
1363154400022 0 200 200
1363157983019 0 200 200
1363157995052 0 200 200

免责声明:文章转载自《MapReduce实战(二)自定义类型排序》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇Hive 使用Tez引擎的配置CentOS 7/8 部署Elasticsearch集群下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

从零搭建企业大数据分析和机器学习平台-技术栈介绍(三)

数据传输和采集 Sqoop数据传输工具实际项目开发中,往往很多业务数据是存放在关系型数据库中,如 MySQL数据库。我们需要将这些数据集中到数据仓库中进行管理,便于使用计算模型进行统计、挖掘这类操作。 Sqoop是Apache软件基金会的⼀一款顶级开源数据传输工具,用于在 Hadoop与关系型数据库(如MySQL、Oracle、PostgreSQL等)之间...

从Hadoop框架与MapReduce模式中谈海量数据处理(含淘宝技术架构)

            从hadoop框架与MapReduce模式中谈海量数据处理 前言     几周前,当我最初听到,以致后来初次接触Hadoop与MapReduce这两个东西,我便稍显兴奋,认为它们非常是神奇,而神奇的东西常能勾起我的兴趣,在看过介绍它们的文章或论文之后,认为Hadoop是一项富有趣味和挑战性的技术,且它还牵扯到了一个我更加感兴趣的话...

Hive面试题收集 ---阿善重要

Hive 原理 1.用户提交查询等任务给Driver。 2.编译器获得该用户的任务Plan。 3.编译器Compiler根据用户任务去MetaStore中获取需要的Hive的元数据信息。 4.编译器Compiler得到元数据信息,对任务进行编译,先将HiveQL转换为抽象语法树,然后将抽象语法树转换成查询块,将查询块转化为逻辑的查询计划,重写逻辑查询计划,...

Hive(1)-基本概念

一. 什么是Hive Hive:由Facebook开源用于解决海量结构化日志的数据统计。 Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张表,并提供类SQL查询功能。 本质是:将HQL(Hive Query Language)转化成MapReduce程序 1)Hive处理的数据存储在HDFS 2)Hive分析数据底层的实现是...

大数据培训班 cloudera公司讲师面对面授课 CCDH CCAH CCP

大数据助力成就非凡。大数据正在改变着商业游戏规则,为企业解决传统业务问题带来变革的机遇。毫无疑问,当未来企业尝试分析现有海量信息以推动业务价值增值时,必定会采用大数据技术。 目前对大数据的分析工具,首选的是Hadoop平台。由于Hadoop深受客户欢迎,许多公司都推出了各自版本的Hadoop,也有一些公司则围绕Hadoop开发产品。在Hadoop生态系统...

ZeroCopyLiteralByteString cannot access superclass

问题描述         在HBase上运行MapReduce作业时,报如下异常:IllegalAccessError: class com.google.protobuf.HBaseZeroCopyByteString cannot access its superclass com.google.protobuf.LiteralByteString  ...