join合并

mapredecu做文件合并

这些代码想了一天也百思不得其解,希望留下来求大佬指点迷津


import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.GenericOptionsParser;
public class MRJoin {
    public static class MR_Join_Mapper extends Mapper<LongWritable, Text, TextPair, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // 获取输入文件的全路径和名称
            String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
            if (pathName.contains("shuju.txt")) {
                String values[] = value.toString().split("\t");
                if (values.length < 3) {
                    // data数据格式不规范,字段小于3,抛弃数据
                    return;
                } else {
                    // 数据格式规范,区分标识为1
                    TextPair tp = new TextPair(new Text(values[1]), new Text("1"));
                    context.write(tp, new Text(values[0] + "\t" + values[2]));
                    //System.out.println(tp.getSecond().toString());
                }
            }
            if (pathName.contains("shuju1.txt")) {
                String values[] = value.toString().split("\t");
                if (values.length < 2) {
                    // data数据格式不规范,字段小于2,抛弃数据
                    return;
                } else {
                    // 数据格式规范,区分标识为0
                    TextPair tp = new TextPair(new Text(values[0]), new Text("0"));
                    context.write(tp, new Text(values[1]));
                }
            }
        }
    }
    public static class MR_Join_Partitioner extends Partitioner<TextPair, Text> {
        @Override
        public int getPartition(TextPair key, Text value, int numParititon) {
            return Math.abs(key.getFirst().hashCode() * 127) % numParititon;
        }
    }
    public static class MR_Join_Comparator extends WritableComparator {
        public MR_Join_Comparator() {
            super(TextPair.class,   true);
        }

        public int compare(WritableComparable a, WritableComparable b) {
            TextPair t1 = (TextPair) a;
            TextPair t2 = (TextPair) b;
            return t1.getFirst().compareTo(t2.getFirst());
        }


    }
    public static class MR_Join_Reduce extends Reducer<TextPair, Text, Text, Text> {
        protected void reduce(TextPair key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            Text pid = key.getFirst();
            String desc = values.iterator().next().toString();
            while (values.iterator().hasNext()) {
                context.write(pid, new Text(values.iterator().next().toString() + "\t" + desc));
            }

        }
    }

    public static void main(String args[])
            throws IOException, InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        //GenericOptionsParser parser = new GenericOptionsParser(conf, args);
        //String[] otherArgs = parser.getRemainingArgs();

        if (args.length < 3) {
            System.err.println("Usage: MRJoin <in_path_one> <in_path_two> <output>");
            System.exit(2);
        }

        //System.out.println(args[2]);

        Job job = new Job(conf);
        // 设置运行的job
        job.setJarByClass(MRJoin.class);
        // 设置Map相关内容]
        job.setMapperClass(MR_Join_Mapper.class);
        // 设置Map的输出
        job.setMapOutputKeyClass(TextPair.class);
        job.setMapOutputValueClass(Text.class);
        // 设置partition
        job.setPartitionerClass(MR_Join_Partitioner.class);
        // 在分区之后按照指定的条件分组
        job.setGroupingComparatorClass(MR_Join_Comparator.class);
        // 设置Reduce
        job.setReducerClass(MR_Join_Reduce.class);
        // 设置Reduce的输出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // 设置输入和输出的目录
        File file = new File("E:\\data\\res9");
        if (file.exists()) {
            FileUtils.deleteDirectory(file);
        }
        FileInputFormat.addInputPath(job, new Path("E:\\data\\shuju.txt"));
        FileInputFormat.addInputPath(job, new Path("E:\\data\\shuju1.txt"));
        FileOutputFormat.setOutputPath(job, new Path("E:\\data\\res9"));

        // 执行,直到结束就退出
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

class TextPair implements WritableComparable<TextPair> {
    private Text first;
    private Text second;

    public TextPair() {
        set(new Text(), new Text());
    }

    public TextPair(String first, String second) {
        set(new Text(first), new Text(second));
    }

    public TextPair(Text first, Text second) {
        set(first, second);
    }

    public void set(Text first, Text second) {
        this.first = first;
        this.second = second;
    }

    public Text getFirst() {
        return first;
    }

    public Text getSecond() {
        return second;
    }

    public void write(DataOutput out) throws IOException {
        first.write(out);
        second.write(out);
    }

    public void readFields(DataInput in) throws IOException {
        first.readFields(in);
        second.readFields(in);
    }


    public int compareTo(TextPair tp) {
        //System.out.println(first);
        //System.out.println("------------");
        //System.out.println(tp.first);
        //System.out.println("------------------");
        int cmp = first.compareTo(tp.first);
        if (cmp != 0) {
            //System.out.println(cmp);
            return cmp;
        }
        return second.compareTo(tp.second);
    }
}

这个代码现在看不懂,希望以后回来的时候可以看懂它。
附上附件
shuju1.txt:

1003	kaka
1004	da
1005	jue
1006	zhao

shuju.txt:

201001	1003	abc
201002	1005	def
201003	1006	ghi
201004	1003	jkl
201005	1004	mno
201006	1005	pqr

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值