Hadoop入门系列(4) -- MapReduce详解

本文详细介绍了Hadoop MapReduce的工作流程,包括Map过程、Reduce过程、Job的调用方式,以及日志输出、Mahout应用、setup和cleanup阶段、HDFS操作、双输入Map和自定义Writable等实用技巧。并提供了矩阵乘法的MapReduce实现作为实践练习。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >


Map过程

wordCount样例中的map过程如下

public static class TokenizerMapper extends
        Mapper<LongWritable, Text, Text, IntWritable> {

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }
}

Reduce过程

wordCount样例中的reduce过程如下:


public static class IntSumReducer extends
        Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}

Job调用

main函数中调用job的过程:

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args)
            .getRemainingArgs();
    if (otherArgs.length != 2) {
        System.err.println("Usage: wordcount <in> <out>");
        System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    # 设置mapper,combiner,reducer类
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    # 设置map输出的key和value类型
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    # 设置reduce的key和value的类型
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    # 输入文件夹和输出文件夹
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

其他常用技巧

日志输出

日志输出主要有两种方法:

  • 方法1:

直接使用System.out.print的之类java提供的接口,并在hadoop的监控网页上查看对应task的输出,如

jobtracker

这里写图片描述

job detail

这里写图片描述

job task

这里写图片描述

job log

这里写图片描述

  • 方法2:

使用log4j来打印,这部分日志会在运行时显示出来,具体查找网上资料,如:

http://www.aboutyun.com/thread-8021-1-1.html

mahout

推荐系统

time mahout recommenditembased --input input2 --output output2 --similarityClassname SIMILARITY_COSINE

setup,cleanup过程

每个maper或reducer可以使用setup和cleanup过程,来进行初始化或最后的cleanup:

public void setup(Context context) {

}

public void cleanup(Context context) throws IOException,
        InterruptedException {
}

hdfs使用

  • 根据路径和Configuration创建fs,使用完成close:
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
fs.close()
  • 判断目录、创建目录:
if (!fs.exists(path)) {
    fs.mkdirs(path);
    System.out.println("Create: " + folder);
}
  • 删除整个目录:
fs.deleteOnExit(path);
  • 列出文件:
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
FileStatus[] list = fs.listStatus(path);
System.out.println("ls: " + folder);
System.out.println("==========================================================");
for (FileStatus f : list) {
    System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen());
}
  • 上传文件,下载文件:
fs.copyFromLocalFile(new Path(local), new Path(remote));
fs.copyToLocalFile(path, new Path(local));

双输入map

  • 方法1

使用1个Map,并判断通过context的接口获取输入文件名:

FileSplit split = (FileSplit) context.getInputSplit();
flag = split.getPath().getName();   //取得输入文件名
  • 方法2

使用2个Map,如下:

MultipleInputs.addInputPath(job, new Path(input2), SequenceFileInputFormat.class,
        Mapper1.class);
MultipleInputs.addInputPath(job, new Path(input1), SequenceFileInputFormat.class, 
        Mapper2.class);

自定义writable

用户可以自定义key和value的类型,如:

public class PairKey implements WritableComparable {
    public int index1;
    public int index2;
    public void write (DataOutput out)
        throws IOException
    {
        out.writeInt(index1);
        out.writeInt(index2);
    }
    public void readFields (DataInput in)
        throws IOException
    {
        index1 = in.readInt();
        index2 = in.readInt();
    }
    public int compareTo (Object other) {
        PairKey o = (PairKey)other;
        if (this.index1 < o.index1) {
            return -1;
        } else if (this.index1 > o.index1) {
            return +1;
        }
        if (this.index2 < o.index2) {
            return -1;
        } else if (this.index2 > o.index2) {
            return +1;
        }
        return 0;
    }
    public int hashCode () {
        return index1 << 16 + index2;
    }

    public String toString() {
        return index1 + "\t"+ index2;
      }
}

上面代码定义一个key的类型,它要实现WritableComparable接口,主要包含read,write和compareTo接口

类似的,value类型的定义如下:

private static class Value implements Writable {
    public int index1;
    public int index2;
    public int flag;
    public int v;
    public void write (DataOutput out)
        throws IOException
    {
        out.writeInt(index1);
        out.writeInt(index2);
        out.writeInt(flag);
        out.writeInt(v);
    }
    public void readFields (DataInput in)
        throws IOException
    {
        index1 = in.readInt();
        index2 = in.readInt();
        flag = in.readInt();
        v = in.readInt();
    }
}

value类型一般不需要比较,所以只要实现Writable接口即可


练习

使用Hadoop实现两个矩阵的乘法运算,文件夹1和文件夹2分别存放两个小矩阵。这里假设两个矩阵的大小已知,且可以相乘。而且不考虑矩阵太大需要分块乘的情况:

A文件夹中文件内容

1 2 3 4
5 6 7 8
9 10 11 12

B文件夹中文件内容如下

1 2 
2 3
4 5
6 7

写出map过程和reduce过程,及job调用。其中双输入和key、value的定制可以参考上面的例子。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值