一个完整的hadoop程序开发过程

摘要:
其功能是实现HDFS的可视化操作。没有它,您需要在终端输入大量命令。每个命令以bin/hadoopdfs开头。以下是对配置过程的简要描述:eclipse和hadoop eclipse插件的版本要求非常高,必须高度匹配才能使用。默认情况下,用户名通常不在中间。到目前为止,已经配置了eclipse的hadoop插件。以下程序源于Hadoop Practice。它们诞生的原因是,原著中的代码缺乏许多条件,没有改进就无法运行。
目的

说明hadoop程序开发过程

前提条件

ubuntu或同类OS

java1.6.0_45

eclipse-indigo

hadoop-0.20.2

hadoop-0.20.2-eclipse-plugin.jar

各项版本一定要匹配,否则出了问题都不知道是什么原因。

配置

配置Java

详见:Ubuntu下搭建JAVA开发环境及卸载

配置分布式Hadoop

详见:hadoop 0.20.2伪分布式安装详解

伪分布式与分布式有两点主要区别:

  1. 在namenode节点配置完成hadoop以后,需要用scp把hadoop复制到datanode节点,为了方便,最好全部机器的路径都是一样的,比如都在/opt/hadoop-0.20.2中。
  2. conf目录下的masters文件要把默认的localhost改成namenode节点的主机名或IP地址,Slaves文件中,要把localhost改成datanode节点的主机名或IP
eclipse的hadoop插件配置

hadoop-0.20.2-eclipse-plugin.jar是一个eclipse中的hadoop插件。

它的作用是实现了HDFS的可视化操作,如果没有它,就要在大量地在终端输入命令,每个命令都是以bin/hadoop dfs开头。

如果你是新手,可能还觉得很新鲜,如果很熟悉命令的话,就会觉得很烦。新手总会变成老手,所以这个插件还是有必要的。

下面简单说一下配置过程:

eclipse和hadoop-eclipse-plugin这套插件的版本要求非常高,一定要高度匹配才能用。另一篇博文写了一部分对应关系:https://www.cnblogs.com/Sabre/p/10621064.html

1.下载hadoop-0.20.2-eclipse-plugin.jar,自行搜索。官网不太容易找旧版本。

2.把此jar放到eclipse插件目录下,一般是plugins目录

重新启动eclipse,如果版本正确,此时在eclipse中的project exporer中应该可以看到DFS Locations项。如果没有出现,很可能是版本的问题。

3.配置Hadoop所在目录。eclipse-->window菜单-->Preferences-->Hadoop Map/Reduce,右侧输入或选择你的Hadoop目录

4.显示Map/ReduceLocations窗口。eclipse-->window菜单-->Open Perspective-->Other,选择蓝色的小象图标Map/Reduce,会在下面出黄色的小象窗口,Map/ReduceLocations

5.配置HadoopLocationMap/ReduceLocations中右键,New Hadoop Location,出现配置窗口,location name随便你写。下面的Map/Reduce Master框中的host,如果是分布式就用IP或主机名,不要用默认的localhost。port改成9000。DFS Master框中的UseM/RMaster host默认打勾保持不变,下面的Port改成9001 。user name一般默认中不中 ,

至此,eclipse的hadoop插件就配置完成了。

编写程序

以下的程序是从《hadoop实战》中脱胎出来的,之所以说脱胎,是因为原书中的代码缺少很多条件,不加以完善是无法运行的。这本书写得不好,感觉是为了评职称之类的事情,让学生给凑的,里面很多硬伤。之所以还在硬着头皮看下去,是因为多少还是讲了一些东西,同时也挑战一下自己,面对不那么完善的环境时,能否解决问题,而不是一味地寻找更好的教材,这是在豆瓣上写的一篇书评:https://book.douban.com/review/10071283/

1.打开eclipse,新建java项目。右键项目,properties,Java Builder Path,Libraries,Add External JARS,找到hadoop的目录,把根目录下的几个jar包都添加进来。

2.新建类,Score_process.java,复制粘贴以下代码:

packagepkg1;
importjava.net.URI;
importjava.util.Iterator;
importjava.util.StringTokenizer;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.conf.Configured;
importorg.apache.hadoop.fs.FileSystem; 
importorg.apache.hadoop.fs.Path;
importorg.apache.hadoop.io.IntWritable;
importorg.apache.hadoop.io.LongWritable;
importorg.apache.hadoop.io.Text;
importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;
importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;
importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
importorg.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
importorg.apache.hadoop.mapreduce.Job;
importorg.apache.hadoop.mapreduce.Mapper;
importorg.apache.hadoop.mapreduce.Reducer;
importorg.apache.hadoop.util.Tool;
importorg.apache.hadoop.util.ToolRunner;
public  class Score_process extends Configured implementsTool {
    //内部类Map
    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>{
        //map方法
        public void map(LongWritable key, Text value, Context context) throwsjava.io.IOException ,InterruptedException {
            System.out.println("key值:" +key);
            String line = value.toString();//将输入的纯文本文件的数据转化为string
            //将输入的数据按行分割
            StringTokenizer tokenizerArticle = new StringTokenizer(line, "
");
            //分别对每一行进行处理
            while(tokenizerArticle.hasMoreTokens()) {
                //每行按空格划分
                StringTokenizer tokenizerLine = newStringTokenizer(tokenizerArticle.nextToken());
                String nameString =tokenizerLine.nextToken();
                String scoreString =tokenizerLine.nextToken();
                Text name = newText(nameString); 
                int scoreInt =Integer.parseInt(scoreString);
                context.write(name, new IntWritable(scoreInt));//输出姓名和成绩
}
        };
    }
    //内部类Reduce
    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{
        //reduce方法
        public void reduce(Text key, java.lang.Iterable<IntWritable> values, Context context) throwsjava.io.IOException ,InterruptedException {
            int sum=0;
            int count=0;
            Iterator<IntWritable> iterator =values.iterator();
            while(iterator.hasNext()) {
                sum +=iterator.next().get();
                count++;
            }
            int average = (int)sum/count;
            context.write(key, newIntWritable(average));
        };
    }    
    public int run(String[] args) throwsException {
        Configuration configuration =getConf();
        //configuration.set("mapred", "Score_Process.jar");
        //准备环境,删除已经存在的output2目录,保证输出目录不存在**开始************
        final String uri = "hdfs://192.168.1.8:9000/";
        FileSystem fs =FileSystem.get(URI.create(uri),configuration);
        final String path = "/user/grid/output2";
        boolean exists = fs.exists(newPath(path));
        if(exists){
            fs.delete(new Path(path),true);
        }
        //准备环境,删除已经存在的output2目录,保证输出目录不存在**结束************

        Job job= newJob(configuration);
        job.setJobName("Score_process");
        job.setJarByClass(Score_process.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
//System.out.println(new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        boolean success = job.waitForCompletion(true);    
        return success ? 0:1;
    }
    public static void main(String[] args) throwsException {
        int ret = ToolRunner.run(newScore_process1(), args);
        System.exit(ret);
    }
}

以上的代码中,有不少是套路,固定的模板。

Map是处理输入参数中给定的文本文件,处理完毕后,输出到HDFS,供reduce调用。context.write(name, new IntWritable(scoreInt));这一句是关键。

Reduce调用map方法的结果,reduce后,写到OS文件系统。context.write(key, newIntWritable(average));这一句是关键。

整个run方法,需要改的只有setJobName和setJarByClass类的名字,其他的不用动。

整个main方法,不用动。

程序部分基本上就是这样。

编译

终端中输入

javac -classpath /opt/hadoop-0.20.2/hadoop-0.20.2-core.jar -d ~/allTest/ScoreProcessFinal/class ~/workspace-indigo/test5/src/pkg1/Score_process.java

如果没有报错,就说明编译成功。

打包
jar -cvf ~/allTest/ScoreProcessFinal/ScoreProcessFinal.jar -C ~/allTest/ScoreProcessFinal/class .

可以用以下命令查看包里的文件:
jar vtf ~/allTest/ScoreProcessFinal/ScoreProcessFinal.jar

执行

执行可以分为两种方式,一种在eclipse中,另一种在终端。

eclipse中运行

配置运行参数。run configurations,arguments,Program arguments:

文本框中输入:hdfs://host-thinkpad:9000/user/grid/input2 hdfs://host-thinkpad:9000/user/grid/output2

就是输入目录和输出目录,注意中间有个空格。

终端中运行

/opt/hadoop-0.20.2/bin/hadoop jar ~/allTest/ScoreProcessFinal/ScoreProcessFinal.jar pkg1.Score_process1 input2 output2

这就是hadoop开发的全过程框架。

其实在此期间发生了很多各种各样的问题,分别记录在各个博文中了。

免责声明:文章转载自《一个完整的hadoop程序开发过程》仅用于学习参考。如对内容有疑问,请及时联系本站处理。

上篇10个顶级的CSS和Javascript动画框架推荐查询sql连接数下篇

宿迁高防,2C2G15M,22元/月;香港BGP,2C5G5M,25元/月 雨云优惠码:MjYwNzM=

相关文章

cloudera目录位置

http://www.aboutyun.com/thread-9189-1-1.html 这里来的嘿嘿。 1. 相关目录/var/log/cloudera-scm-installer : 安装日志目录。/var/log/* : 相关日志文件(相关服务的及CM的)。/usr/share/cmf/ : 程序安装目录。/usr/lib64/cmf/ : Age...

Hadoop学习之路(八)在eclispe上搭建Hadoop开发环境

一、添加插件 将hadoop-eclipse-plugin-2.7.5.jar放入eclipse的plugins文件夹中 二、在Windows上安装Hadoop2.7.5 版本最好与Linux集群中的hadoop版本保持一致 1、将hadoop-2.7.5-centos-6.7.tar.gz解压到Windows上的C盘software目录中 2、配置h...

卸载Ambari集群

清理ambari安装的hadoop集群本文针对redhat或者centos 对于测试集群,如果通过ambari安装hadoop集群后,想重新再来一次的话,需要清理集群。 对于安装了很多hadoop组件的话,这个工作很繁琐。接下来是我整理的清理过程。 1,通过ambari将集群中的所用组件都关闭,如果关闭不了,直接kill -9 XXX 2,关闭ambari...

Apache Beam是什么?

  不多说,直接上干货! 以下是Apache Beam的官网 : https://beam.apache.org/ Apache Beam的前世今生       Apache Beam前身是Google Dataflow SDK,DataFlow是谷歌的提供大数据计算平台。在DataFlow之前,谷歌的批处理和流处理(流计算,实时处理)使用了不同系统,流...

CDH集群主节点宕机恢复

1       情况概述 公司的开发集群在周末莫名其妙的主节点Hadoop-1的启动固态盘挂了,由于CM、HDFS的NameNode、HBase的Master都安装在Hadoop-1,导致了整个集群都无法使用,好在数据不在启动盘。 Hadoop-1的系统必须重装,但是不能重装集群,因为要将之前的数据全部保留恢复,所以只能通过集群恢复的手段将集群重新跑起来。...

VMware虚拟机搭建Spark集群

目录 一、搭建方法 二、准备 三、系统环境配置 四、软件安装与配置 a. 软件下载安装 b. Hadoop配置 c. Spark配置 五、虚拟机克隆 六、启动集群 七、踩坑经历 一、搭建方法 在虚拟机上搭建集群的方法通常有两种1.类似于真实的机器上部署,首先要进行密钥授权使各台机器之间能够免密码相互访问,然后在主节点上将各个软件配置好,分发各...