Partitioner编程——根据运营商分组统计用户上网流量

摘要:
Partitioner是Partitioner的基类。如果需要自定义partitioner,还需要继承该类。HashPartitioner是mapreduce的默认分区器。计算方法为WHICHREDUCER=%numReduceTasks以获取当前目标减速器。如果要对v2进行排序,则需要将k2和v2作为k2组装到新类中,以参与比较。分组也根据k2进行比较。谁决定分区的数量?缩减器的数量与分区的数量一样多PublicClassDataCount{publicstaticvoid mainthrowsException{Configurationconf=newConfiguration() ; Jobjob=Job.getInstance;job.setJarByClass;job.setMapperClass;job.setMapOutputKeyClass;job.setMap输出值类;job.setReducerClass;job.setOutputKeyClass;job.setOutputValueClass;文件输入格式.setInputPaths;文件输出格式.setOutputPath;//设置partitioner的执行类job.setPartitionerClass;job.setNumReduceTasks;job.waitForCompletion;}//映射阶段k1:行号v1:一行数据k2:移动电话号码v2:javaBeanpublicstatisticclassDCMapperextendsMapper<LongWriteable,Text,Text,DataInfo˃{privateTextk=newText();@OverrideprotectedvoidmapthrowsIOException,InterruptedException{Stringline=value.toString();String[]fields=line.split(“”);Stringtel=fields[1];longup=Long.parseLong;longdown=Long.parseLong;DataInfodataInfo=newDataInfo;k.set;context.write;}//谁决定分区阶段的数量/***?
  1. Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。

  2. HashPartitioner是mapreduce的默认partitioner。计算方法是
    which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。

  3. (例子以jar形式运行)

排序和分组

  1. 在map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较的。如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2,才能参与比较。
  2. 分组时也是按照k2进行比较的。

partition的数量由谁来决定?—-reducer!!

有多少reducer就有多少partitioner

public class DataCount {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        job.setJarByClass(DataCount.class);

        job.setMapperClass(DCMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DataInfo.class);

        job.setReducerClass(DCReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DataInfo.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));

        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //设置partitioner的执行类
        job.setPartitionerClass(DCPartitioner.class);

        job.setNumReduceTasks(Integer.parseInt(args[2]));


        job.waitForCompletion(true);

    }
    //Map阶段 k1:行号 v1:一行数据 k2:手机号 v2:有用字段组成的javaBean
    public static class DCMapper extends Mapper<LongWritable, Text, Text, DataInfo>{

        private Text k = new Text();

        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, DataInfo>.Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            String[] fields = line.split("	");
            String tel = fields[1];
            long up = Long.parseLong(fields[8]);
            long down = Long.parseLong(fields[9]);
            DataInfo dataInfo = new DataInfo(tel,up,down);
            k.set(tel);
            context.write(k, dataInfo);

        }

    }
    //Partition阶段  
    /**
      * partition的数量由谁来决定?----reducer !!
      *  有多少个reducer就有多少个partitioner
      */
    public static class DCPartitioner extends  Partitioner<Text, DataInfo>{
        //定义一个map用于存放运营商的对应分组号
        //static是自上往下执行的
        private static Map<String,Integer> provider = new HashMap<String,Integer>();

        static{
            provider.put("138", 1);
            provider.put("139", 1);
            provider.put("152", 2);
            provider.put("153", 2);
            provider.put("182", 3);
            provider.put("183", 3);
        }
        /**
          *返回值:int 分组号,一个组对应一个map
          */
        @Override
        public int getPartition(Text key, DataInfo value, int numPartitions) {
            //向数据库或配置信息 读写
            String tel_sub = key.toString().substring(0,3);
            //获取手机号前三位,对运营商进行分组识别
            Integer count = provider.get(tel_sub);
            if(count == null){
                count = 0;
            }
            //返回组号
            return count;
        }

    }

    //Reduce阶段 k2:手机号 v2:dataInfo迭代器 k3:手机号 v3:dataInfo
    public static class DCReducer extends Reducer<Text, DataInfo, Text, DataInfo>{

        @Override
        protected void reduce(Text key, Iterable<DataInfo> values,Reducer<Text, DataInfo, Text, DataInfo>.Context context)
                throws IOException, InterruptedException {
            long up_sum = 0;
            long down_sum = 0;
            for(DataInfo d : values){
                up_sum += d.getUpPayLoad();
                down_sum += d.getDownPayLoad();
            }
            DataInfo dataInfo = new DataInfo("",up_sum,down_sum);

            context.write(key, dataInfo);
        }

    }


}

免责声明:文章转载自《Partitioner编程——根据运营商分组统计用户上网流量》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇WMware克隆虚拟机后出现网络无法连接的问题N字形变化 flag标签转换方向下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

Kafka学习之路 (一)Kafka的简介

讨论QQ:1586558083 目录 一、简介 1.1 概述 1.2 消息系统介绍 1.3 点对点消息传递模式 1.4 发布-订阅消息传递模式 二、Kafka的优点 2.1 解耦 2.2 冗余(副本) 2.3 扩展性 2.4 灵活性&峰值处理能力 2.5 可恢复性 2.6 顺序保证 2.7 缓冲 2.8 异步通信 三、常用Me...

Oracle去除重复(某一列的值重复),取最新(日期字段最新)的一条数据

参考地址:https://blog.csdn.net/nux_123/article/details/45037719 解决思路:用Oracle的row_number() over函数来解决该问题。 解决过程: 1.查看表中的重复记录 SELECT * FROM find_new ; 2.标记重复的记录 select t.id,...

hive简介

http://www.51niux.com/ 一、Hive介绍 Hive官网:https://hive.apache.org/ 1.1 hive简介 Hive是一个数据仓库基础工具在Hadoop中用来处理结构化数据。它架构在Hadoop之上,总归为大数据,并使得查询和分析方便。并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。...

大数据入门第十七天——storm上游数据源 之kafka详解(一)入门与集群安装

一、概述 1.kafka是什么 根据标题可以有个概念:kafka是storm的上游数据源之一,也是一对经典的组合,就像郭德纲和于谦 根据官网:http://kafka.apache.org/intro的解释呢,是这样的: Apache Kafka® is a distributed streaming platform ApacheKafka®是一个分布...

剑指Offer28 最小的K个数(Partition函数应用+大顶堆)

包含了Partition函数的多种用法 以及大顶堆操作 1 /************************************************************************* 2 > File Name: 28_KLeastNumbers.cpp 3 > Author: Juntaran 4 > Mai...

分布式消息队列RocketMQ与Kafka架构上的巨大差异之1 -- 为什么RocketMQ要去除ZK依赖?

我们知道,在早期的RocketMQ版本中,是有依赖ZK的。而现在的版本中,是去掉了对ZK的依赖,转而使用自己开发的NameSrv。 并且这个NameSrv是无状态的,你可以随意的部署多台,其代码也非常简单,非常轻量。 那不禁要问了:ZooKeeper是业界用来管理集群的一个非常常用的中间件,比如Kafka就是依赖的ZK。那为什么RocketMQ要自己造轮...

最新文章