Hadoop—MapReduce计算气象温度

摘要:
Hadoop—MapReduce计算气象温度1运行环境说明1.1硬软件环境主机操作系统:MacOS64bit,8G内存虚拟软件:ParallersDesktop12虚拟机操作系统:CentOS64位,单核,512内存JDK:javaversion"1.7.0_45"Hadoop:1.1.21.2机器网络环境集群包含三个节点:1个namenode、2个datanode,其中节点之间可以相互ping通。
Hadoop—MapReduce计算气象温度

1 运行环境说明

1.1 硬软件环境

  • 主机操作系统:Mac OS 64 bit ,8G内存
  • 虚拟软件:Parallers Desktop12
  • 虚拟机操作系统:CentOS 64位,单核,512内存
  • JDK:java version "1.7.0_45"
  • Hadoop:1.1.2

1.2 机器网络环境

集群包含三个节点:1个namenode、2个datanode,其中节点之间可以相互ping通。节点IP地址和主机名分布如下:

序号IP地址机器名类型用户名运行进程
1192.168.33.200Master名称节点hahaNN、SNN、JobTracer
2192.168.33.201Slave1数据节点hahaDN、TaskTracer
3192.168.33.202Slave2数据节点hahaDN、TaskTracer
4192.168.33.203Slave3数据节点hahaDN、TaskTracer

所有节点均是CentOS6.5 64bit系统,防火墙均禁用,所有节点上均创建了一个haha用户,用户主目录是/home/haha。

2 使用MapReduce求每年最低温度

2.1 内容

下载气象数据集部分数据,写一个Map-Reduce作业,求每年的最低温度,部署并运行之.
Hadoop—MapReduce计算气象温度第1张

分析Map-Reduce过程
Hadoop—MapReduce计算气象温度第2张

Map-Reduce编程模型
Hadoop—MapReduce计算气象温度第3张

2.1.1 Map-reduce的思想就是“分而治之”

  • Mapper

    Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”执行
    “简单的任务”有几个含义:

    • 1 数据戒计算规模相对于原任务要大大缩小;
    • 2 就近计算 ,即会被分配到存放了所需数据的节点进行计算;
    • 3 这些小任务可以幵行计算,彼此间几乎没有依赖关系
  • Reducer

    对map阶段的结果进行汇总

    • Reducer的数目由mapred-site.xml配置文件里的项目mapred.reduce.tasks决定。缺 省值为1,用户可以覆盖之

2.2 运行代码

2.2.1 MinTemperature

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
publicclass MinTemperature {
   
    public staticvoid main(String[] args) throws Exception {
        if(args.length != 2) {
            System.err.println("Usage: MinTemperature<input path> <output path>");
            System.exit(-1);
        }
       
        Job job = new Job();
        job.setJarByClass(MinTemperature.class);
        job.setJobName("Min temperature");
        //new Path(args[0])控制台的第一个参数--输入路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        //new Path(args[1])控制台的第二个参数--输出路径
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //指定Mapper是哪个类
        job.setMapperClass(MinTemperatureMapper.class);
        //指定Reducer是哪个类
        job.setReducerClass(MinTemperatureReducer.class);
        //指定输出的key和value是什么
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

2.2.2 MinTemperatureMapper

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
 
public class MinTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
 
    private static final int MISSING = 9999;
   
    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       
        String line = value.toString();
        String year = line.substring(15, 19);
       
        int airTemperature;
        if(line.charAt(87) == '+') {
            airTemperature = Integer.parseInt(line.substring(88, 92));
        } else {
            airTemperature = Integer.parseInt(line.substring(87, 92));
        }
       
        String quality = line.substring(92, 93);
        if(airTemperature != MISSING && quality.matches("[01459]")) {
            context.write(new Text(year), new IntWritable(airTemperature));
        }
    }
}

2.2.3 MinTemperatureReducer

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
 
public class MinTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
 
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
       
        int minValue = Integer.MAX_VALUE;
        for(IntWritable value : values) {
            minValue = Math.min(minValue, value.get());
        }
        context.write(key, new IntWritable(minValue));
    }
}

2.3 实现过程

2.3.1 编写代码

进入/home/haha/hadoop-1.1.2/myclass目录,在该目录中建立MinTemperature.JavaMinTemperatureMapper.javaMinTemperatureReducer.java代码文件,代码内容为2.2所示,执行命令如下:

[haha@Master ~]$cd /home/haha/hadoop-1.1.2/myclass/
[haha@Master myclass]$vi MinTemperature.java
[haha@Master myclass]$vi MinTemperatureMapper.java
[haha@Master myclass]$vi MinTemperatureReducer.java

MinTemperature.java

Hadoop—MapReduce计算气象温度第4张

MinTemperatureMapper.java
Hadoop—MapReduce计算气象温度第5张

MinTemperatureReducer.java
Hadoop—MapReduce计算气象温度第6张

2.3.2编译代码

在/home/haha/hadoop-1.1.2/myclass目录中,使用如下命令对java代码进行编译,为保证编译成功,加入classpath变量,引入hadoop-core-1.1.2.jar包:

[haha@Master myclass]$javac -classpath ../hadoop-core-1.1.2.jar *.java
[haha@Master myclass]$ls
[haha@Master myclass]$mv *.jar
[haha@Master myclass]$rm *.class

2.3.4创建目录

进入/home/haha/hadoop-1.1.2/bin目录,在HDFS中创建气象数据存放路径/user/haha/in,执行命令如下:

cd /home/haha/hadoop-1.1.2/bin
hadoop fs -mkdir /user/haha/in
hadoop fs -ls /user/haha

Hadoop—MapReduce计算气象温度第7张

2.3.5解压气象数据并上传到HDFS中

使用SSH工具或者scp命令把从NCDC下载的气象数据上传到上步骤创建的目录/user/haha/in中。

使用zcat命令把这些数据文件解压并合并到一个sample.txt文件中,合并后把这个文件上传到HDFS文件系统的/usr/hadoop/in目录中:

cd /user/haha/hadoop-1.1.2/in
zcat *.gz > sample.txt
hadoop fs -copyFromLocal sample.txt /user/haha/in

气象数据具体的下载地址为 ftp://ftp3.ncdc.noaa.gov/pub/data/noaa/ ,该数据包括1900年到现在所有年份的气象数据,大小大概有70多个G。为了测试简单,我们这里选取一部分的数据进行测试

2.3.6 运行程序

以jar的方式启动MapReduce任务,执行输出目录为/user/haha/outputFile:


cd /home/haha/hadoop-1.1.2
hadoop jar MinTemperature.jar MinTemperature /usr/hadoop/in/sample.txt  outputFile

Hadoop—MapReduce计算气象温度第8张

2.3.7查看结果

执行成功后,查看/user/haha/outputFile目录中是否存在运行结果,使用cat查看结果:

[haha@Master ~]$ hadoop fs -ls /user/haha/outputFile
[haha@Master ~]$ hadoop fs -cat /user/haha/outputFile/part-r-00000
[haha@Master ~]$ hadoop fs -cat /user/haha/outputFile/part-r-00000
1972	11

2.3.8通过页面结果

1. 查看jobtracker.jsp

http://master:50030/jobtracker.jsp

Hadoop—MapReduce计算气象温度第9张
已经完成的作业任务:

Hadoop—MapReduce计算气象温度第10张

任务的详细信息:
Hadoop—MapReduce计算气象温度第11张
Hadoop—MapReduce计算气象温度第12张

2.查看dfshealth.jsp

http://master:50070/dfshealth.jsp
Hadoop—MapReduce计算气象温度第13张

分别查看HDFS文件系统和日志

Hadoop—MapReduce计算气象温度第14张

Hadoop—MapReduce计算气象温度第15张

3 求温度平均值能使用combiner吗?

Q:如果求温度的平均值,能使用combiner吗?有没有变通的方法.

A:不能使用,因为求平均值和前面求最值存在差异,各局部最值的最值还是等于整体的最值的,但是对于平均值而言,各局部平均值的平均值将不再是整体的平均值了,所以不能用combiner。可以通过变通的办法使用combiner来计算平均值,即在combiner的键值对中不直接存储最后的平均值,而是存储所有值的和个数,最后在reducer输出时再用和除以个数得到平均值。

3.1 程序代码

AvgTemperature.java

import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
   
public class AvgTemperature {  
     
    public static void main(String[] args) throws Exception {  
           
        if(args.length != 2) {  
            System.out.println("Usage: AvgTemperatrue <input path><output path>");  
            System.exit(-1);  
        }  
         
        Job job = new Job();  
        job.setJarByClass(AvgTemperature.class);  
        job.setJobName("Avg Temperature");  
        FileInputFormat.addInputPath(job, new Path(args[0]));  
        FileOutputFormat.setOutputPath(job, new Path(args[1]));  
         
        job.setMapperClass(AvgTemperatureMapper.class);  
        job.setCombinerClass(AvgTemperatureCombiner.class);  
        job.setReducerClass(AvgTemperatureReducer.class);  
         
        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(Text.class);  
         
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(IntWritable.class);  
         
        System.exit(job.waitForCompletion(true) ? 0 : 1);  
    }  
}  

AvgTemperatureMapper.java

import java.io.IOException;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Mapper;  
   
publicclass AvgTemperatureMapper extends Mapper<LongWritable, Text, Text, Text> {  
   
    private static final int MISSING = 9999;  
     
    @Override  
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{  
         
        String line = value.toString();  
        String year = line.substring(15, 19);  
         
        int airTemperature;  
        if(line.charAt(87) == '+') {  
            airTemperature = Integer.parseInt(line.substring(88, 92));  
        } else {  
            airTemperature =  Integer.parseInt(line.substring(87, 92));  
        }  
         
        String quality = line.substring(92, 93);  
        if(airTemperature != MISSING && !quality.matches("[01459]")) {  
            context.write(new Text(year), new Text(String.valueOf(airTemperature)));  
        }  
    }  
}  

AvgTemperatureCombiner.java

import java.io.IOException;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Reducer;  
   
public class AvgTemperatureCombiner extends Reducer<Text, Text, Text, Text>{  
   
    @Override  
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
         
        double sumValue = 0;  
        long numValue = 0;  
         
        for(Text value : values) {  
            sumValue += Double.parseDouble(value.toString());  
            numValue ++;  
        }  
         
        context.write(key, new Text(String.valueOf(sumValue) + ',' + String.valueOf(numValue)));  
    }  
}  

AvgTemperatureReducer.java

import java.io.IOException;  
import org.apache.hadoop.io.IntWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.mapreduce.Reducer;  
   
public class AvgTemperatureReducer extends Reducer<Text, Text, Text, IntWritable>{  
   
    @Override  
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {  
         
        double sumValue = 0;  
        long numValue = 0;  
        int avgValue = 0;  
         
        for(Text value : values) {  
            String[] valueAll = value.toString().split(",");  
            sumValue += Double.parseDouble(valueAll[0]);  
            numValue += Integer.parseInt(valueAll[1]);  
        }  
         
        avgValue  = (int)(sumValue/numValue);  
        context.write(key, new IntWritable(avgValue));  
    }  
}  

3.2 实现过程

编写代码

进入/home/haha/hadoop-1.1.2/myclass目录,在该目录中建立AvgTemperature.java、AvgTemperatureMapper.java、AvgTemperatureCombiner.java和AvgTemperatureReducer.java代码文件,执行命令如下:

cd /usr/local/hadoop-1.1.2/myclass/
vi AvgTemperature.java
vi AvgTemperatureMapper.java
vi AvgTemperatureCombiner.java
vi AvgTemperatureReducer.java

编译代码

在/home/user/hadoop-1.1.2/myclass目录中,使用如下命令对java代码进行编译,为保证编译成功,加入classpath变量,引入hadoop-core-1.1.2.jar包:

javac -classpath ../hadoop-core-1.1.2.jar *.java
ls

Hadoop—MapReduce计算气象温度第16张

打包编译文件

把编译好class文件打包,否则在执行过程会发生错误。把打好的包移动到上级目录并删除编译好的class文件:

jar cvf ./AvgTemperature.jar ./*.class
ls
mv *.jar ..
rm *.class

Hadoop—MapReduce计算气象温度第17张

运行程序

数据使用求每年最低温度的气象数据,数据在HDFS位置为/user/haha/in/sample.txt,以jar的方式启动MapReduce任务,执行输出目录为/user/haha/out1:

cd /home/haha/hadoop-1.1.2
hadoop jar AvgTemperature.jar AvgTemperature /user/haha/in/sample.txt  /user/haha/out1

查看结果

执行成功后,查看/user/haha/out1目录中是否存在运行结果,使用cat查看结果:

hadoop fs -ls /user/haha/out1
hadoop fs -cat /user/haha/out1/part-r-00000

Hadoop—MapReduce计算气象温度第18张

免责声明:文章转载自《Hadoop—MapReduce计算气象温度》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇journalctl查看内核/应用日志 一介凡人PHP 5 Date/Time 函数下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

启动secondarynamenode时报错

环境: mac系统 + hadoop2.6.0-cdh5.7.0伪分布式 问题一: 在启动hdfs的secondarynamenode时,报错。 正常情况: sumengdeMacBook-Pro:sbin sumeng$ ./start-dfs.sh 18/06/11 21:35:00 WARN util.NativeCodeLoader: Unable...

安装 Accumulo——突破自己,就是成长

前言 在我刚开始接触分布式集群的时候,是自己在几台虚拟机中手动安装的 Hadoop 和 Spark ,所以当时对 Hadoop 的配置有个简单的印象 ,但是后面发现了 Cloudera 和 Ambari 之后(两个分布式集群自动管理工具),就再没有手动安装过。这就导致我用了很久的 Accumulo 却从未手动安装过,使用 Cloudera 安装导致我根本没...

Centos(64位)安装Hbase详细步骤

HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang 所撰写的Google论文“Bigtable:一个结构化数据的分布式存储系统”。就像Bigtable利用了Google文件系统(File System)所提供的分布式数据存储一样,HBase在Hadoop之上提供了类似于Bigtable的能力。HBase是Apache的Hadoo...

Hive 安装配置

实验简介 本次课程学习了如何安装配置 Hive。 一、实验环境说明 1. 环境登录 无需密码自动登录,系统用户名shiyanlou,密码shiyanlou 2. 环境介绍 本实验环境采用带桌面的Ubuntu Linux环境,实验中会用到桌面上的程序: XfceTerminal: Linux命令行终端,打开后会进入Bash环境,可以使用Linux命令;...

Hive入门学习随笔(一)

===什么是Hive? Hive是基于Hadoop HDFS之上的数据仓库。 我们可以把数据存储在这个基于数据的仓库之中,进行分析和处理,完成我们的业务逻辑。 本质上就是一个数据库 ===什么是数据仓库? 实际上就是一个数据库。我们可以利用数据仓库来保存我们的数据。 与一般意义上的数据库不同。数据库是一个面向主题的、集成的、不可更新的、随时间不变化的数据集...

[业界方案] ClickHouse业界解决方案学习笔记

[业界方案] ClickHouse业界解决方案学习笔记 目录 [业界方案] ClickHouse业界解决方案学习笔记 0x00 摘要 0x01 简介 0x02 OLAP场景的特点 0x03 选型原因 携程选型原因 头条选型原因 0x04 技术特点 0x05 多 数据Sharding 数据Partitioning 高吞吐写入能力 支持数据复制和...