两个表 a 表 name id b 表 id address
a b
代码
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.Iterator; public class Join { private static class JoinMap extends Mapper<Object, Text, Text, Text> { protected void map(Object key, Text values, Context context) throws IOException, InterruptedException { String line = values.toString(); if (line.contains("name") == true || line.contains("id")) { return; } String[] str = line.split(","); String flag = new String(); if (str[0].length() == 1) { flag = "2"; context.write(new Text(str[0]), new Text(flag + "+" + str[1])); } else if (str[0].length() > 1) { flag = "1"; context.write(new Text(str[1]), new Text(flag + "+" + str[0])); } } } private static class JionReduce extends Reducer<Text, Text, Text, Text> { // 实现reduce函数 private int time = 0; public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 输出表头 if (0 == time) { context.write(new Text("name"), new Text("id")); time++; } int namenum = 0; String[] name = new String[10]; int addressnum = 0; String[] address = new String[10]; for(Text value:values) { String[] str2 = value.toString().split("\\+"); if (str2[0].compareTo("1") == 0) { name[namenum] = str2[1]; namenum++; } if (str2[0].compareTo("2") == 0) { address[addressnum] = str2[1]; addressnum++; } } //循环必须正确 if (0 != namenum && 0 != addressnum) { for (int m = 0; m < namenum; m++) { for (int n = 0; n < addressnum; n++) { // 输出结果 context.write(new Text(name[m]), new Text(address[n])); } } } /* Iterator ite = values.iterator(); while (ite.hasNext()) { String record = ite.next().toString(); int len = record.length(); int i = 2; if (0 == len) { continue; } // 取得左右表标识 char relationtype = record.charAt(0); // 左表 if ('1' == relationtype) { name[namenum] = record.substring(i); namenum++; } // 右表 if ('2' == relationtype) { address[addressnum] = record.substring(i); addressnum++; } }*/ // 求笛卡尔积 } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(Join.class); job.setMapperClass(JoinMap.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // job.setReducerClass(JionReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // Path inPath = new Path(args[0]); FileInputFormat.addInputPath(job, inPath); Path outPath = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if (fs.exists(outPath)) { fs.delete(outPath, true); } FileOutputFormat.setOutputPath(job, outPath); job.waitForCompletion(true); } }
执行结果: