需求:
基于上一道题,我想将结果按照总流量的大小由大到小输出。
思考:
默认mapreduce是对key字符串按照字母进行排序的,而我们想任意排序,只需要把key设成一个类,再对该类写一个compareTo(大于要比较对象返回1,等于返回0,小于返回-1)方法就可以了。
注:这里如果是实现java.lang.Comparable接口,最终报错,还是直接实现WritableComparable吧。
FlowBean.java更改如下:
packagecn.darrenchan.hadoop.mr.flow; importjava.io.DataInput; importjava.io.DataOutput; importjava.io.IOException; importorg.apache.hadoop.io.Writable; importorg.apache.hadoop.io.WritableComparable; public class FlowBean implements WritableComparable<FlowBean>{ private String phoneNum;//手机号 private long upFlow;//上行流量 private long downFlow;//下行流量 private long sumFlow;//总流量 publicFlowBean() { super(); } public FlowBean(String phoneNum, long upFlow, longdownFlow) { super(); this.phoneNum =phoneNum; this.upFlow =upFlow; this.downFlow =downFlow; this.sumFlow = upFlow +downFlow; } publicString getPhoneNum() { returnphoneNum; } public voidsetPhoneNum(String phoneNum) { this.phoneNum =phoneNum; } public longgetUpFlow() { returnupFlow; } public void setUpFlow(longupFlow) { this.upFlow =upFlow; } public longgetDownFlow() { returndownFlow; } public void setDownFlow(longdownFlow) { this.downFlow =downFlow; } public longgetSumFlow() { returnsumFlow; } public void setSumFlow(longsumFlow) { this.sumFlow =sumFlow; } @Override publicString toString() { return upFlow + " " + downFlow + " " +sumFlow; } //从数据流中反序列出对象的数据 //从数据流中读出对象字段时,必须跟序列化时的顺序保持一致 @Override public void readFields(DataInput in) throwsIOException { phoneNum =in.readUTF(); upFlow =in.readLong(); downFlow =in.readLong(); sumFlow =in.readLong(); } //将对象数据序列化到流中 @Override public void write(DataOutput out) throwsIOException { out.writeUTF(phoneNum); out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } @Override public intcompareTo(FlowBean flowBean) { return sumFlow > flowBean.getSumFlow() ? -1 : 1; } }
建立文件SortMR.java:
packagecn.darrenchan.hadoop.mr.flowsort; importjava.io.IOException; importorg.apache.commons.io.output.NullWriter; importorg.apache.commons.lang.StringUtils; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.fs.Path; importorg.apache.hadoop.io.LongWritable; importorg.apache.hadoop.io.NullWritable; importorg.apache.hadoop.io.Text; importorg.apache.hadoop.mapreduce.Job; importorg.apache.hadoop.mapreduce.Mapper; importorg.apache.hadoop.mapreduce.Reducer; importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat; importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat; importcn.darrenchan.hadoop.mr.flow.FlowBean; //执行命令:hadoop jar flowsort.jar cn.darrenchan.hadoop.mr.flowsort.SortMR /flow/output /flow/outputsort public classSortMR { public static class SortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable>{ //拿到一行数据,切分出各字段,封装为一个flowbean,作为key输出 @Override protected voidmap(LongWritable key, Text value, Context context) throwsIOException, InterruptedException { String line =value.toString(); String[] words = StringUtils.split(line, " "); String phoneNum = words[0]; long upFlow = Long.parseLong(words[1]); long downFlow = Long.parseLong(words[2]); context.write(newFlowBean(phoneNum, upFlow, downFlow), NullWritable.get()); } } public static class SortReducer extends Reducer<FlowBean, NullWritable, Text, FlowBean>{ @Override protected void reduce(FlowBean key, Iterable<NullWritable>values, Context context) throwsIOException, InterruptedException { String phoneNum =key.getPhoneNum(); context.write(newText(phoneNum), key); } } public static void main(String[] args) throwsException { Configuration conf = newConfiguration(); Job job =Job.getInstance(conf); job.setJarByClass(SortMR.class); job.setMapperClass(SortMapper.class); job.setReducerClass(SortReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
我们现在处理的结果是上一次实验的输出结果,打成jar包flowsort.jar,执行命令:
hadoop jar flowsort.jar cn.darrenchan.hadoop.mr.flowsort.SortMR /flow/output /flow/outputsort
得到的处理信息如下:
17/02/26 05:22:36 INFO client.RMProxy: Connecting to ResourceManager at weekend110/192.168.230.134:8032
17/02/26 05:22:36 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/02/26 05:22:36 INFO input.FileInputFormat: Total input paths to process : 1
17/02/26 05:22:36 INFO mapreduce.JobSubmitter: number of splits:1
17/02/26 05:22:37 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1488112052214_0003
17/02/26 05:22:37 INFO impl.YarnClientImpl: Submitted application application_1488112052214_0003
17/02/26 05:22:37 INFO mapreduce.Job: The url to track the job: http://weekend110:8088/proxy/application_1488112052214_0003/
17/02/26 05:22:37 INFO mapreduce.Job: Running job: job_1488112052214_0003
17/02/26 05:24:16 INFO mapreduce.Job: Job job_1488112052214_0003 running in uber mode : false
17/02/26 05:24:16 INFO mapreduce.Job: map 0% reduce 0%
17/02/26 05:24:22 INFO mapreduce.Job: map 100% reduce 0%
17/02/26 05:24:28 INFO mapreduce.Job: map 100% reduce 100%
17/02/26 05:24:28 INFO mapreduce.Job: Job job_1488112052214_0003 completed successfully
17/02/26 05:24:28 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=933
FILE: Number of bytes written=187799
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=735
HDFS: Number of bytes written=623
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=3077
Total time spent by all reduces in occupied slots (ms)=2350
Total time spent by all map tasks (ms)=3077
Total time spent by all reduce tasks (ms)=2350
Total vcore-seconds taken by all map tasks=3077
Total vcore-seconds taken by all reduce tasks=2350
Total megabyte-seconds taken by all map tasks=3150848
Total megabyte-seconds taken by all reduce tasks=2406400
Map-Reduce Framework
Map input records=22
Map output records=22
Map output bytes=883
Map output materialized bytes=933
Input split bytes=112
Combine input records=0
Combine output records=0
Reduce input groups=22
Reduce shuffle bytes=933
Reduce input records=22
Reduce output records=22
Spilled Records=44
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=142
CPU time spent (ms)=1280
Physical memory (bytes) snapshot=218406912
Virtual memory (bytes) snapshot=726446080
Total committed heap usage (bytes)=137433088
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=623
File Output Format Counters
Bytes Written=623
最终结果如下,可以看到是排序好的。
1363157985069 186852 200 187052
1363157985066 2481 24681 27162
1363157990043 63 11058 11121
1363157986072 18 9531 9549
1363157982040 102 7335 7437
1363157984041 9 6960 6969
1363157995093 3008 3720 6728
1363157995074 4116 1432 5548
1363157992093 4938 200 5138
1363157973098 27 3659 3686
1363157995033 20 3156 3176
1363157984040 12 1938 1950
1363157986029 3 1938 1941
1363157991076 1512 200 1712
1363157993044 12 1527 1539
1363157993055 954 200 1154
1363157985079 180 200 380
1363157986041 180 200 380
1363157988072 120 200 320
1363154400022 0 200 200
1363157983019 0 200 200
1363157995052 0 200 200