wordcount的MapReduce程序编写

本文档将指导你如何编写一个WordCount的MapReduce程序。通过实例代码讲解,不依赖特定继承或实现,程序完成后进行打包并测试,最终展示运行结果。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

在之前我已经介绍过MapReduce程序运行时的过程,接下来我们自己编写一个wordcount程序,我会在代码中做详细的标注:

不多说直接上代码:(不继承也不实现)

package com.superyong.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * 词频统计 MapReduce 程序
 * 还记得之前在虚拟机上运行的那个 wordcount 模板么
 * bin/yarn jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.3.jar wordcount /datas/mapreduce_test/input /datas/mapreduce_test/output/output01
 * 这上面的参数 依次顺序是: 执行平台-》执行包的类型-》包所在的绝对目录-》执行的类-》被执行文件路径(hdfs路径)-》执行结果文件存放路径(hdfs路径)
 * 这里说一下因为官方提供的类有默认的执行类名,我们没有设置,所以执行类的时候,要写类的全路径
 */
public class WordCountMapReduce {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //这里判断一下参数的个数,上面说过,
        //运行这个程序需要指定 ‘被运行文件’ 和‘结果文件’的目录,
        //这两个参数通过main函数中的args传递,可以在执行时打开配置来设置这两个参数!
        if (args.length < 2){
            System.out.println( "Usage:WordCountMapReduce <in> [<in>...] <out>" );
            return;
        }

        //读取配置文件信息;注意别导错包!是hadoop的包
        Configuration configuration = new Configuration();

        //创建一个job任务
        /*
          import org.apache.hadoop.mapred.jobcontrol.Job;   hadoop 1.x
          import org.apache.hadoop.mapreduce.Job;     hadoop 2.x
          注意这两包:虽然都是job类,但是一个是1.0版本的hadoop包一个是2.0版本的hadoop包
         */
        //  public static Job getInstance( Configuration conf, String jobName )
        //  这两个参数一个是配置文件,一个是application中的job任务名,运行程序之后就可以在8088上看到
        Job job = Job.getInstance( configuration, "WordCount" );

        //设置Job任务运行的主类
        job.setJarByClass( WordCountMapReduce.class );

        //设置job

        //input
        Path inputPath = new Path( args[0] );//获取被执行文件的目录
        FileInputFormat.setInputPaths( job,inputPath );

        //这两个可以等代码写完之后在进行设置,如果已经知道类型就直接设置:
        //map
        job.setMapperClass( WordCountMapper.class );
        job.setMapOutputKeyClass( Text.class );
        job.setMapOutputValueClass( IntWritable.class );

        //reduce
        job.setReducerClass( WordCountReducer.class );
        job.setOutputKeyClass( Text.class );
        job.setOutputValueClass( IntWritable.class );

        //output
        Path outputPath = new Path( args[1] );//获取结果文件存放的目录
        FileOutputFormat.setOutputPath(job,outputPath);

        //提交任务,显示任务进度
        boolean isSuccess = job.waitForCompletion(true);
        //打印执行状态
        System.exit( isSuccess?0:1 );

    }

    /*
        创建类 WordCountMapper 继承 Mapper 类,来实现 map 方法
        public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     参数说明:
     KEYIN
        文件中每行数据的偏移量,使用Long类型表示(Long的包装类型)
     VALUEIN
        文本中每行的内容,本质是字符串,使用文本类型Text(Hadoop的类型)
     KEYOUT
        map方法输出Key的类型,此处就是单词,使用文本Text
     VALUEOUT
        map方法输出的value的类型,此处是出现的次数,就是1,使用IntWritable(Int包装类型)
     */
    private static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

        //定义两个宏,方便 map 方法中的 context 进行使用:
        private Text mapOutPutKey = new Text();
        //private IntWritable mapOutPutValue = new IntWritable( 1 );
        //这里可以优化一下,因为所有的value都是1,所以不用每次都创建
        private final static IntWritable mapOutPutValue = new IntWritable( 1 );

        //在这个类中实现 map 方法:
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //todo:将行内容进行分割,进行相应的map操作
            //获取行内容:
            String line = value.toString();

            //分割行内容:
            String[] items = line.split(" ");//分割后的行内容
            //将得到的行内容进行遍历
            for (String word : items) {
                //这里我们用context将map方法的结果进行输出
                //  public void write(KEYOUT key, VALUEOUT value)

                //这里类型不同需要转类型,再类外面定义宏
                //给宏设置值:
                mapOutPutKey.set( word );
                //mapOutPutKey不用设置了,因为都是1,就省略掉了,直接用就可以
                context.write(mapOutPutKey,mapOutPutValue);
            }
        }

    }

    /*
        创建类 WordCountReducer 继承 Reducer 类,来实现 map 方法
        public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
        参数说明:
        KEYIN
            map 的输出作为 reduce 的输入,这里就是单词
        VALUEIN
            相同 key (单词)的 value 的集合 :  word <1,1,1>
        KEYOUT
            最后的输出就是单词
        VALUEOUT
            单词出现的频率个数
     */
    private static class WordCountReducer extends Reducer< Text,IntWritable,Text,IntWritable> {

        //定义宏,方便 reduce 方法中的 context 进行使用:
        private IntWritable outPutValue = new IntWritable();

        //在这个类中实现 reduce 方法:
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

            //定义一个 sum 变量去统计每个单词出现的次数
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }

            //使用 context 进行输出
            //值的类型需要转换,定义宏
            outPutValue.set( sum );
            context.write( key , outPutValue );
        }
    }

}

编写完成之后打包进行测试:

E:\MyCode\Hadoop>mvn package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Hadoop 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:3.0.2:resources (default-resources) @ Hadoop ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 2 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.7.0:compile (default-compile) @ Hadoop ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- maven-resources-plugin:3.0.2:testResources (default-testResources) @ Hadoop ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] skip non existing resourceDirectory E:\MyCode\Hadoop\src\test\resources
[INFO]
[INFO] --- maven-compiler-plugin:3.7.0:testCompile (default-testCompile) @ Hadoop ---
[INFO] Nothing to compile - all classes are up to date
[INFO]
[INFO] --- maven-surefire-plugin:2.20.1:test (default-test) @ Hadoop ---
[INFO] No tests to run.
[INFO]
[INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ Hadoop ---
[INFO] Building jar: E:\MyCode\Hadoop\target\Hadoop-1.0-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 7.491 s
[INFO] Finished at: 2019-03-04T10:13:05+08:00
[INFO] Final Memory: 15M/225M
[INFO] ------------------------------------------------------------------------

打包完成,接下来进行测试:

[super-yong@bigdata-01 hadoop-2.7.3]$ bin/yarn jar test_jar/Hadoop-1.0-SNAPSHOT.jar com.superyong.mapreduce.WordCountMapReduce /datas/mapreduce_test/input /datas/mapreduce_test/output/output02
19/03/04 10:38:03 INFO client.RMProxy: Connecting to ResourceManager at bigdata-03.superyong.com/192.168.59.13:8032
19/03/04 10:38:04 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
19/03/04 10:38:05 INFO input.FileInputFormat: Total input paths to process : 1
19/03/04 10:38:05 INFO mapreduce.JobSubmitter: number of splits:1
19/03/04 10:38:05 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1551724410290_0001
19/03/04 10:38:06 INFO impl.YarnClientImpl: Submitted application application_1551724410290_0001
19/03/04 10:38:06 INFO mapreduce.Job: The url to track the job: http://bigdata-03.superyong.com:8088/proxy/application_1551724410290_0001/
19/03/04 10:38:06 INFO mapreduce.Job: Running job: job_1551724410290_0001
19/03/04 10:38:24 INFO mapreduce.Job: Job job_1551724410290_0001 running in uber mode : false
19/03/04 10:38:24 INFO mapreduce.Job:  map 0% reduce 0%
19/03/04 10:38:34 INFO mapreduce.Job:  map 100% reduce 0%
19/03/04 10:38:47 INFO mapreduce.Job:  map 100% reduce 100%
19/03/04 10:38:48 INFO mapreduce.Job: Job job_1551724410290_0001 completed successfully
19/03/04 10:38:48 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=538
                FILE: Number of bytes written=238859
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=416
                HDFS: Number of bytes written=122
                HDFS: Number of read operations=6
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters
                Launched map tasks=1
                Launched reduce tasks=1
                Data-local map tasks=1
                Total time spent by all maps in occupied slots (ms)=7594
                Total time spent by all reduces in occupied slots (ms)=9187
                Total time spent by all map tasks (ms)=7594
                Total time spent by all reduce tasks (ms)=9187
                Total vcore-milliseconds taken by all map tasks=7594
                Total vcore-milliseconds taken by all reduce tasks=9187
                Total megabyte-milliseconds taken by all map tasks=7776256
                Total megabyte-milliseconds taken by all reduce tasks=9407488
        Map-Reduce Framework
                Map input records=10
                Map output records=43
                Map output bytes=446
                Map output materialized bytes=538
                Input split bytes=142
                Combine input records=0
                Combine output records=0
                Reduce input groups=15
                Reduce shuffle bytes=538
                Reduce input records=43
                Reduce output records=15
                Spilled Records=86
                Shuffled Maps =1
                Failed Shuffles=0
                Merged Map outputs=1
                GC time elapsed (ms)=1117
                CPU time spent (ms)=5020
                Physical memory (bytes) snapshot=441966592
                Virtual memory (bytes) snapshot=4208967680
                Total committed heap usage (bytes)=276299776
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters
                Bytes Read=274
        File Output Format Counters
                Bytes Written=122

这是8088端口的运行信息:

被执行文件:

运行结果:

 

 

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值