Map过程
wordCount样例中的map过程如下
public static class TokenizerMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
Reduce过程
wordCount样例中的reduce过程如下:
public static class IntSumReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
Job调用
main函数中调用job的过程:
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
# 设置mapper,combiner,reducer类
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
# 设置map输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
# 设置reduce的key和value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
# 输入文件夹和输出文件夹
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
其他常用技巧
日志输出
日志输出主要有两种方法:
- 方法1:
直接使用System.out.print的之类java提供的接口,并在hadoop的监控网页上查看对应task的输出,如
jobtracker
job detail
job task
job log
- 方法2:
使用log4j来打印,这部分日志会在运行时显示出来,具体查找网上资料,如:
http://www.aboutyun.com/thread-8021-1-1.html
mahout
推荐系统
time mahout recommenditembased --input input2 --output output2 --similarityClassname SIMILARITY_COSINE
setup,cleanup过程
每个maper或reducer可以使用setup和cleanup过程,来进行初始化或最后的cleanup:
public void setup(Context context) {
}
public void cleanup(Context context) throws IOException,
InterruptedException {
}
hdfs使用
- 根据路径和Configuration创建fs,使用完成close:
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
fs.close()
- 判断目录、创建目录:
if (!fs.exists(path)) {
fs.mkdirs(path);
System.out.println("Create: " + folder);
}
- 删除整个目录:
fs.deleteOnExit(path);
- 列出文件:
FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
FileStatus[] list = fs.listStatus(path);
System.out.println("ls: " + folder);
System.out.println("==========================================================");
for (FileStatus f : list) {
System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen());
}
- 上传文件,下载文件:
fs.copyFromLocalFile(new Path(local), new Path(remote));
fs.copyToLocalFile(path, new Path(local));
双输入map
- 方法1
使用1个Map,并判断通过context的接口获取输入文件名:
FileSplit split = (FileSplit) context.getInputSplit();
flag = split.getPath().getName(); //取得输入文件名
- 方法2
使用2个Map,如下:
MultipleInputs.addInputPath(job, new Path(input2), SequenceFileInputFormat.class,
Mapper1.class);
MultipleInputs.addInputPath(job, new Path(input1), SequenceFileInputFormat.class,
Mapper2.class);
自定义writable
用户可以自定义key和value的类型,如:
public class PairKey implements WritableComparable {
public int index1;
public int index2;
public void write (DataOutput out)
throws IOException
{
out.writeInt(index1);
out.writeInt(index2);
}
public void readFields (DataInput in)
throws IOException
{
index1 = in.readInt();
index2 = in.readInt();
}
public int compareTo (Object other) {
PairKey o = (PairKey)other;
if (this.index1 < o.index1) {
return -1;
} else if (this.index1 > o.index1) {
return +1;
}
if (this.index2 < o.index2) {
return -1;
} else if (this.index2 > o.index2) {
return +1;
}
return 0;
}
public int hashCode () {
return index1 << 16 + index2;
}
public String toString() {
return index1 + "\t"+ index2;
}
}
上面代码定义一个key的类型,它要实现WritableComparable接口,主要包含read,write和compareTo接口
类似的,value类型的定义如下:
private static class Value implements Writable {
public int index1;
public int index2;
public int flag;
public int v;
public void write (DataOutput out)
throws IOException
{
out.writeInt(index1);
out.writeInt(index2);
out.writeInt(flag);
out.writeInt(v);
}
public void readFields (DataInput in)
throws IOException
{
index1 = in.readInt();
index2 = in.readInt();
flag = in.readInt();
v = in.readInt();
}
}
value类型一般不需要比较,所以只要实现Writable接口即可
练习
使用Hadoop实现两个矩阵的乘法运算,文件夹1和文件夹2分别存放两个小矩阵。这里假设两个矩阵的大小已知,且可以相乘。而且不考虑矩阵太大需要分块乘的情况:
A文件夹中文件内容
1 2 3 4
5 6 7 8
9 10 11 12
B文件夹中文件内容如下
1 2
2 3
4 5
6 7
写出map过程和reduce过程,及job调用。其中双输入和key、value的定制可以参考上面的例子。