我们根据wordcount的mapreduce程序来编写一个MapReduce的通用模板,方便之后的使用:
首先说明一下之前的博客中有提到过有三种编写MapReduce的模板:
---------------------------------------------------------------------
三种编写MapReduce模板:
Driver:
-》不继承也不实现
-》继承和实现 - 官方推荐
extends Configured implements Tool
-》不继承只实现 - 企业用的最多
implements Tool
--------------------------------------------------------------------
所以今天要写的模板就是官方推荐的模板:
不多说了,直接上图上代码:
1.先创建好文件夹,准备好三个类
MRDriver:
package com.superyong.template;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MRDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
/*
读取配置文件信息;注意别导错包!是hadoop的包
Configuration configuration = new Configuration();
已经继承了 Configured ,所以就不用创建 Configuration 类了
*/
//创建一个job任务
/*
import org.apache.hadoop.mapred.jobcontrol.Job; hadoop 1.x
import org.apache.hadoop.mapreduce.Job; hadoop 2.x
注意这两包:虽然都是job类,但是一个是1.0版本的hadoop包一个是2.0版本的hadoop包
*/
// public static Job getInstance( Configuration conf, String jobName )
// 这两个参数一个是配置文件,一个是application中的job任务名,运行程序之后就可以在8088上看到
Job job = Job.getInstance( this.getConf(), "MRDriver" );
//设置Job任务运行的主类
job.setJarByClass( MRDriver.class );
//设置job
//input 设置文件读取路径
Path inputPath = new Path( args[0] );//获取被执行文件的目录
FileInputFormat.setInputPaths( job,inputPath );
//这两个可以等代码写完之后在进行设置,如果已经知道类型就直接设置:
//map 设置运行的 map 任务的类,指定 map 输出的 key,value 类型
job.setMapperClass( MRMapper.class );
job.setMapOutputKeyClass( null );
job.setMapOutputValueClass( null );
//reduce 设置运行的 reduce 任务的类,指定 reduce 输出的 key,value 类型
job.setReducerClass( MRReducer.class );
job.setOutputKeyClass( null );
job.setOutputValueClass( null );
//output
Path outputPath = new Path( args[1] );//获取结果文件存放的目录
//如果输出目录存在,先删除原有的目录
/*
首先获取文件系统
FileSystem fileSystem = FileSystem.get(configuration);
因为这个类继承了 Configured ,所以直接用 this.getConf() 去获取就好
接下来判断文件目录是否存在,存在就删除
*/
FileSystem fileSystem = FileSystem.get(this.getConf());
if (fileSystem.exists(outputPath)) {
//删除目录和删除文件的命令不一样,我们删除目录就设置为true,文件就设置为false:
fileSystem.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job,outputPath);
//提交任务,显示任务进度
boolean isSuccess = job.waitForCompletion(true);
//打印执行状态
return isSuccess?0:1;
}
public static void main(String[] args) {
//这里判断一下参数的个数,上面说过,
//运行这个程序需要指定 ‘被运行文件’ 和‘结果文件’的目录,
//这两个参数通过main函数中的args传递,可以在执行时打开配置来设置这两个参数!
if (args.length < 2){
System.out.println( "Usage:MRDriver <in> [<in>...] <out>" );
return;
}
/*
下面我们来调用写好的 run
public static int run(Configuration conf, Tool tool, String[] args)
需要三个参数Configuration conf, Tool tool, String[] args
我们已经实现了 Tool ,就是 MRDriver ,所以直接new他就好不需要再实现
*/
Configuration configuration = new Configuration();
try {
//这里的状态就是 return isSuccess?0:1;执行状态码
int status = ToolRunner.run(configuration, new MRDriver(), args);
System.exit( status );
} catch (Exception e) {
e.printStackTrace();
}
}
}
MRMapper:
package com.superyong.template;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
//为了不报错,随便给了四个类型,类型根据自己的业务需求进行设置
public class MRMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//todo:实现你需要的业务
}
}
MRReduce:
package com.superyong.template;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
//为了不报错,随便给了四个类型,类型根据自己的业务需求进行设置
public class MRReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//todo:实现你需要的业务
}
}
以上就是完整的 MapReduce 模板。