MapReduce多表关联实测

本文通过实例展示了如何使用MapReduce进行两个表的关联操作。a表包含name和id字段,b表包含id和address字段。通过代码的执行,得出具体的关联结果。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

两个表 a 表  name id   b 表 id  address

a       

代码

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.Iterator;

public class Join {
    private static class JoinMap extends Mapper<Object, Text, Text, Text> {

        protected void map(Object key, Text values, Context context) throws IOException, InterruptedException {
            String line = values.toString();
            if (line.contains("name") == true || line.contains("id")) {
                return;
            }
            String[] str = line.split(",");
            String flag = new String();
            if (str[0].length() == 1) {
                flag = "2";
                context.write(new Text(str[0]), new Text(flag + "+" + str[1]));
            } else if (str[0].length() > 1) {
                flag = "1";
                context.write(new Text(str[1]), new Text(flag + "+" + str[0]));
            }
        }
    }

    private static class JionReduce extends Reducer<Text, Text, Text, Text> {
        // 实现reduce函数
        private int time = 0;

        public void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            // 输出表头
            if (0 == time) {
                context.write(new Text("name"), new Text("id"));
                time++;
            }
            int namenum = 0;
            String[] name = new String[10];
            int addressnum = 0;
            String[] address = new String[10];

            for(Text value:values) {
                String[] str2 = value.toString().split("\\+");
                if (str2[0].compareTo("1") == 0) {
                    name[namenum] = str2[1];
                    namenum++;
                }
                if (str2[0].compareTo("2") == 0) {
                    address[addressnum] = str2[1];
                    addressnum++;
                }
            }
            //循环必须正确
            if (0 != namenum && 0 != addressnum) {
                for (int m = 0; m < namenum; m++) {
                    for (int n = 0; n < addressnum; n++) {
                        // 输出结果
                        context.write(new Text(name[m]), new Text(address[n]));
                    }

                }

            }

           /* Iterator ite = values.iterator();

            while (ite.hasNext()) {

                String record = ite.next().toString();

                int len = record.length();

                int i = 2;

                if (0 == len) {

                    continue;
                }
                // 取得左右表标识
                char relationtype = record.charAt(0);
                // 左表
                if ('1' == relationtype) {
                    name[namenum] = record.substring(i);
                    namenum++;
                }
                // 右表
                if ('2' == relationtype) {
                    address[addressnum] = record.substring(i);
                    addressnum++;

                }

            }*/
            // 求笛卡尔积


        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(Join.class);

        job.setMapperClass(JoinMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        //
        job.setReducerClass(JionReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //
        Path inPath = new Path(args[0]);

        FileInputFormat.addInputPath(job, inPath);

        Path outPath = new Path(args[1]);
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(outPath)) {
            fs.delete(outPath, true);
        }
        FileOutputFormat.setOutputPath(job, outPath);
        job.waitForCompletion(true);
    }
}

执行结果:

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值