在之前我已经介绍过MapReduce程序运行时的过程,接下来我们自己编写一个wordcount程序,我会在代码中做详细的标注:
不多说直接上代码:(不继承也不实现)
package com.superyong.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 词频统计 MapReduce 程序
* 还记得之前在虚拟机上运行的那个 wordcount 模板么
* bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar wordcount /datas/mapreduce_test/input /datas/mapreduce_test/output/output01
* 这上面的参数 依次顺序是: 执行平台-》执行包的类型-》包所在的绝对目录-》执行的类-》被执行文件路径(hdfs路径)-》执行结果文件存放路径(hdfs路径)
* 这里说一下因为官方提供的类有默认的执行类名,我们没有设置,所以执行类的时候,要写类的全路径
*/
public class WordCountMapReduce {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//这里判断一下参数的个数,上面说过,
//运行这个程序需要指定 ‘被运行文件’ 和‘结果文件’的目录,
//这两个参数通过main函数中的args传递,可以在执行时打开配置来设置这两个参数!
if (args.length < 2){
System.out.println( "Usage:WordCountMapReduce <in> [<in>...] <out>" );
return;
}
//读取配置文件信息;注意别导错包!是hadoop的包
Configuration configuration = new Configuration();
//创建一个job任务
/*
import org.apache.hadoop.mapred.jobcontrol.Job; hadoop 1.x
import org.apache.hadoop.mapreduce.Job; hadoop 2.x
注意这两包:虽然都是job类,但是一个是1.0版本的hadoop包一个是2.0版本的hadoop包
*/
// public static Job getInstance( Configuration conf, String jobName )
// 这两个参数一个是配置文件,一个是application中的job任务名,运行程序之后就可以在8088上看到
Job job = Job.getInstance( configuration, "WordCount" );
//设置Job任务运行的主类
job.setJarByClass( WordCountMapReduce.class );
//设置job
//input
Path inputPath = new Path( args[0] );//获取被执行文件的目录
FileInputFormat.setInputPaths( job,inputPath );
//这两个可以等代码写完之后在进行设置,如果已经知道类型就直接设置:
//map
job.setMapperClass( WordCountMapper.class );
job.setMapOutputKeyClass( Text.class );
job.setMapOutputValueClass( IntWritable.class );
//reduce
job.setReducerClass( WordCountReducer.class );
job.setOutputKeyClass( Text.class );
job.setOutputValueClass( IntWritable.class );
//output
Path outputPath = new Path( args[1] );//获取结果文件存放的目录
FileOutputFormat.setOutputPath(job,outputPath);
//提交任务,显示任务进度
boolean isSuccess = job.waitForCompletion(true);
//打印执行状态
System.exit( isSuccess?0:1 );
}
/*
创建类 WordCountMapper 继承 Mapper 类,来实现 map 方法
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
参数说明:
KEYIN
文件中每行数据的偏移量,使用Long类型表示(Long的包装类型)
VALUEIN
文本中每行的内容,本质是字符串,使用文本类型Text(Hadoop的类型)
KEYOUT
map方法输出Key的类型,此处就是单词,使用文本Text
VALUEOUT
map方法输出的value的类型,此处是出现的次数,就是1,使用IntWritable(Int包装类型)
*/
private static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
//定义两个宏,方便 map 方法中的 context 进行使用:
private Text mapOutPutKey = new Text();
//private IntWritable mapOutPutValue = new IntWritable( 1 );
//这里可以优化一下,因为所有的value都是1,所以不用每次都创建
private final static IntWritable mapOutPutValue = new IntWritable( 1 );
//在这个类中实现 map 方法:
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//todo:将行内容进行分割,进行相应的map操作
//获取行内容:
String line = value.toString();
//分割行内容:
String[] items = line.split(" ");//分割后的行内容
//将得到的行内容进行遍历
for (String word : items) {
//这里我们用context将map方法的结果进行输出
// public void write(KEYOUT key, VALUEOUT value)
//这里类型不同需要转类型,再类外面定义宏
//给宏设置值:
mapOutPutKey.set( word );
//mapOutPutKey不用设置了,因为都是1,就省略掉了,直接用就可以
context.write(mapOutPutKey,mapOutPutValue);
}
}
}
/*
创建类 WordCountReducer 继承 Reducer 类,来实现 map 方法
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
参数说明:
KEYIN
map 的输出作为 reduce 的输入,这里就是单词
VALUEIN
相同 key (单词)的 value 的集合 : word <1,1,1>
KEYOUT
最后的输出就是单词
VALUEOUT
单词出现的频率个数
*/
private static class WordCountReducer extends Reducer< Text,IntWritable,Text,IntWritable> {
//定义宏,方便 reduce 方法中的 context 进行使用:
private IntWritable outPutValue = new IntWritable();
//在这个类中实现 reduce 方法:
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//定义一个 sum 变量去统计每个单词出现的次数
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
//使用 context 进行输出
//值的类型需要转换,定义宏
outPutValue.set( sum );
context.write( key , outPutValue );
}
}
}
编写完成之后打包进行测试:
E:\MyCode\Hadoop>mvn package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Hadoop 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:3.0.2:resources (default-resources) @ Hadoop ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 2 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.7.0:compile (default-compile) @ Hadoop ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- maven-resources-plugin:3.0.2:testResources (default-testResources) @ Hadoop ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory E:\MyCode\Hadoop\src\test\resources
[INFO]
[INFO] --- maven-compiler-plugin:3.7.0:testCompile (default-testCompile) @ Hadoop ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- maven-surefire-plugin:2.20.1:test (default-test) @ Hadoop ---
[INFO] No tests to run.
[INFO]
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ Hadoop ---
[INFO] Building jar: E:\MyCode\Hadoop\target\Hadoop-1.0-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 7.491 s
[INFO] Finished at: 2019-03-04T10:13:05+08:00
[INFO] Final Memory: 15M/225M
[INFO] ------------------------------------------------------------------------
打包完成,接下来进行测试:
[super-yong@bigdata-01 hadoop-2.7.3]$ bin/yarn jar test_jar/Hadoop-1.0-SNAPSHOT.jar com.superyong.mapreduce.WordCountMapReduce /datas/mapreduce_test/input /datas/mapreduce_test/output/output02
19/03/04 10:38:03 INFO client.RMProxy: Connecting to ResourceManager at bigdata-03.superyong.com/192.168.59.13:8032
19/03/04 10:38:04 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
19/03/04 10:38:05 INFO input.FileInputFormat: Total input paths to process : 1
19/03/04 10:38:05 INFO mapreduce.JobSubmitter: number of splits:1
19/03/04 10:38:05 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1551724410290_0001
19/03/04 10:38:06 INFO impl.YarnClientImpl: Submitted application application_1551724410290_0001
19/03/04 10:38:06 INFO mapreduce.Job: The url to track the job: http://bigdata-03.superyong.com:8088/proxy/application_1551724410290_0001/
19/03/04 10:38:06 INFO mapreduce.Job: Running job: job_1551724410290_0001
19/03/04 10:38:24 INFO mapreduce.Job: Job job_1551724410290_0001 running in uber mode : false
19/03/04 10:38:24 INFO mapreduce.Job: map 0% reduce 0%
19/03/04 10:38:34 INFO mapreduce.Job: map 100% reduce 0%
19/03/04 10:38:47 INFO mapreduce.Job: map 100% reduce 100%
19/03/04 10:38:48 INFO mapreduce.Job: Job job_1551724410290_0001 completed successfully
19/03/04 10:38:48 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=538
FILE: Number of bytes written=238859
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=416
HDFS: Number of bytes written=122
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=7594
Total time spent by all reduces in occupied slots (ms)=9187
Total time spent by all map tasks (ms)=7594
Total time spent by all reduce tasks (ms)=9187
Total vcore-milliseconds taken by all map tasks=7594
Total vcore-milliseconds taken by all reduce tasks=9187
Total megabyte-milliseconds taken by all map tasks=7776256
Total megabyte-milliseconds taken by all reduce tasks=9407488
Map-Reduce Framework
Map input records=10
Map output records=43
Map output bytes=446
Map output materialized bytes=538
Input split bytes=142
Combine input records=0
Combine output records=0
Reduce input groups=15
Reduce shuffle bytes=538
Reduce input records=43
Reduce output records=15
Spilled Records=86
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=1117
CPU time spent (ms)=5020
Physical memory (bytes) snapshot=441966592
Virtual memory (bytes) snapshot=4208967680
Total committed heap usage (bytes)=276299776
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=274
File Output Format Counters
Bytes Written=122
这是8088端口的运行信息:
被执行文件:
运行结果: