Hadoop序列化

  1. 什么是序列化
    序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
    反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。
  2. 为什么要序列化
    一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。
  3. 为什么不用Java的序列化
    Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。
  4. Hadoop序列化特点
    a. 紧凑 :高效使用存储空间。
    b. 快速:读写数据的额外开销小。
    c. 互操作:支持多语言的交互
  5. 实现序列化
    Hadoop的序列化是实现org.apache.hadoop.io.Writable接口,该接口包含readFields(),write()两个方法,代码如下:
package org.apache.hadoop.io;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;

@Public
@Stable
public interface Writable {
    void write(DataOutput var1) throws IOException;

    void readFields(DataInput var1) throws IOException;
}

MapReduce中涉及的k1、v1、k2、v2、k3、v3、k4、v4都需要序列化,意味着需要实现Writable接口,并实现readFields()、write()方法。
具体实现对象序列化步骤如下7步:

  1. 必须实现Writable接口
  2. 反序列化时,需要反射调用空参构造函数,所以必须有空参构造:
  3. 重写序列化write方法
  4. 重写反序列化readFields方法
  5. 注意反序列化的顺序和序列化的顺序完全一致;
  6. 要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用
  7. 如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序

序列化实操案例:
需求:将v2定义为员工信息,类型为Employee。
编写代码:

  1. 编写Employee类
package com.li.mapreduce.salarytotalnew;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class Employee implements Writable {

    private int empno;
    private String ename;
    private String job;
    private int mgr;
    private String hiredate;
    private int sal;
    private int comm;
    private int deptno;

    @Override
    public void readFields(DataInput input) throws IOException {
        // 反序列化
        this.empno = input.readInt();
        this.ename = input.readUTF();
        this.job = input.readUTF();
        this.mgr = input.readInt();
        this.hiredate = input.readUTF();
        this.sal = input.readInt();
        this.comm = input.readInt();
        this.deptno = input.readInt();
    }

    @Override
    public void write(DataOutput output) throws IOException {
        // 序列化
        output.writeInt(this.empno);
        output.writeUTF(this.ename);
        output.writeUTF(this.job);
        output.writeInt(this.mgr);
        output.writeUTF(this.hiredate);
        output.writeInt(this.sal);
        output.writeInt(this.comm);
        output.writeInt(this.deptno);
    }

    @Override
    public String toString() {
        return ename + "\t" + job + "\t" + hiredate;
    }


    public int getEmpno() {
        return empno;
    }

    public void setEmpno(int empno) {
        this.empno = empno;
    }

    public String getEname() {
        return ename;
    }

    public void setEname(String ename) {
        this.ename = ename;
    }

    public String getJob() {
        return job;
    }

    public void setJob(String job) {
        this.job = job;
    }

    public int getMgr() {
        return mgr;
    }

    public void setMgr(int mgr) {
        this.mgr = mgr;
    }

    public String getHiredate() {
        return hiredate;
    }

    public void setHiredate(String hiredate) {
        this.hiredate = hiredate;
    }

    public int getSal() {
        return sal;
    }

    public void setSal(int sal) {
        this.sal = sal;
    }

    public int getComm() {
        return comm;
    }

    public void setComm(int comm) {
        this.comm = comm;
    }

    public int getDeptno() {
        return deptno;
    }

    public void setDeptno(int deptno) {
        this.deptno = deptno;
    }
}
  1. 编写Mapper类
package com.li.mapreduce.salarytotalnew;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class PartEmployeeMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {
    @Override
    public void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
        // 数据:7369,SMITH,CLERK,7902,1980/12/17,800,,20
        String data = value1.toString();
        // 分词
        String[] words = data.split(",");
        // 创建员工对象
        Employee e = new Employee();
        // 设置员工的属性
        // 员工号
        e.setEmpno(Integer.parseInt(words[0]));
        // 姓名
        e.setEname(words[1]);
        // 职位
        e.setJob(words[2]);
        // 领导编号
        try {
            e.setMgr(Integer.parseInt(words[3]));
        } catch (Exception ex) {
            // 没有领导编号
            e.setMgr(-1);
        }
        // 入职日期
        e.setHiredate(words[4]);
        // 月薪
        e.setSal(Integer.parseInt(words[5]));
        // 奖金
        try {
            e.setComm(Integer.parseInt(words[6]));
        } catch (Exception ex) {
            e.setComm(0);
        }

        // 部门编号
        e.setDeptno(Integer.parseInt(words[7]));

        // 输出:k2部门号  v2员工对象
        context.write(new IntWritable(e.getDeptno()), //员工的部门号
                e); // 员工对象
    }
}
  1. 编写Reducer类
package com.li.mapreduce.salarytotalnew;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class PartEmployeeReducer extends Reducer<IntWritable, Employee, IntWritable, Employee> {
    @Override
    protected void reduce(IntWritable k3, Iterable<Employee> v3, Context context) throws IOException, InterruptedException {
        /*
        * k3 部门号
        * v3 部门的员工
        */
        for (Employee e : v3) {
            context.write(k3, e);
        }
    }
}
  1. 编写Main类
package com.li.mapreduce.salarytotalnew;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class PartEmployeeMain {
    public static void main(String[] args) throws Exception {
        //创建一个job
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(PartEmployeeMain.class);
        
        //指定job的mapper和输出的类型k2 v2
        job.setMapperClass(PartEmployeeMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Employee.class);
        
        //指定任务的分区规划
        job.setPartitionerClass(MyEmployeePartitioner.class);
        
        // 指定建立几个分区
        job.setNumReduceTasks(3);
        
        // 指定job的reducer和输出的类型k4 v4
        job.setReducerClass(PartEmployeeReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Employee.class);
        
        // 指定job的输入和输出的路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[0]));
        
        // 执行任务
        job.waitForCompletion(true);
    }
}
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值