- 什么是序列化
序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。 - 为什么要序列化
一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。 然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。 - 为什么不用Java的序列化
Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。 - Hadoop序列化特点
a. 紧凑 :高效使用存储空间。
b. 快速:读写数据的额外开销小。
c. 互操作:支持多语言的交互 - 实现序列化
Hadoop的序列化是实现org.apache.hadoop.io.Writable接口,该接口包含readFields(),write()两个方法,代码如下:
package org.apache.hadoop.io;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
@Public
@Stable
public interface Writable {
void write(DataOutput var1) throws IOException;
void readFields(DataInput var1) throws IOException;
}
MapReduce中涉及的k1、v1、k2、v2、k3、v3、k4、v4都需要序列化,意味着需要实现Writable接口,并实现readFields()、write()方法。
具体实现对象序列化步骤如下7步:
- 必须实现Writable接口
- 反序列化时,需要反射调用空参构造函数,所以必须有空参构造:
- 重写序列化write方法
- 重写反序列化readFields方法
- 注意反序列化的顺序和序列化的顺序完全一致;
- 要想把结果显示在文件中,需要重写toString(),可用"\t"分开,方便后续用
- 如果需要将自定义的bean放在key中传输,则还需要实现Comparable接口,因为MapReduce框中的Shuffle过程要求对key必须能排序
序列化实操案例:
需求:将v2定义为员工信息,类型为Employee。
编写代码:
- 编写Employee类
package com.li.mapreduce.salarytotalnew;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Employee implements Writable {
private int empno;
private String ename;
private String job;
private int mgr;
private String hiredate;
private int sal;
private int comm;
private int deptno;
@Override
public void readFields(DataInput input) throws IOException {
// 反序列化
this.empno = input.readInt();
this.ename = input.readUTF();
this.job = input.readUTF();
this.mgr = input.readInt();
this.hiredate = input.readUTF();
this.sal = input.readInt();
this.comm = input.readInt();
this.deptno = input.readInt();
}
@Override
public void write(DataOutput output) throws IOException {
// 序列化
output.writeInt(this.empno);
output.writeUTF(this.ename);
output.writeUTF(this.job);
output.writeInt(this.mgr);
output.writeUTF(this.hiredate);
output.writeInt(this.sal);
output.writeInt(this.comm);
output.writeInt(this.deptno);
}
@Override
public String toString() {
return ename + "\t" + job + "\t" + hiredate;
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public int getSal() {
return sal;
}
public void setSal(int sal) {
this.sal = sal;
}
public int getComm() {
return comm;
}
public void setComm(int comm) {
this.comm = comm;
}
public int getDeptno() {
return deptno;
}
public void setDeptno(int deptno) {
this.deptno = deptno;
}
}
- 编写Mapper类
package com.li.mapreduce.salarytotalnew;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class PartEmployeeMapper extends Mapper<LongWritable, Text, IntWritable, Employee> {
@Override
public void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException {
// 数据:7369,SMITH,CLERK,7902,1980/12/17,800,,20
String data = value1.toString();
// 分词
String[] words = data.split(",");
// 创建员工对象
Employee e = new Employee();
// 设置员工的属性
// 员工号
e.setEmpno(Integer.parseInt(words[0]));
// 姓名
e.setEname(words[1]);
// 职位
e.setJob(words[2]);
// 领导编号
try {
e.setMgr(Integer.parseInt(words[3]));
} catch (Exception ex) {
// 没有领导编号
e.setMgr(-1);
}
// 入职日期
e.setHiredate(words[4]);
// 月薪
e.setSal(Integer.parseInt(words[5]));
// 奖金
try {
e.setComm(Integer.parseInt(words[6]));
} catch (Exception ex) {
e.setComm(0);
}
// 部门编号
e.setDeptno(Integer.parseInt(words[7]));
// 输出:k2部门号 v2员工对象
context.write(new IntWritable(e.getDeptno()), //员工的部门号
e); // 员工对象
}
}
- 编写Reducer类
package com.li.mapreduce.salarytotalnew;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class PartEmployeeReducer extends Reducer<IntWritable, Employee, IntWritable, Employee> {
@Override
protected void reduce(IntWritable k3, Iterable<Employee> v3, Context context) throws IOException, InterruptedException {
/*
* k3 部门号
* v3 部门的员工
*/
for (Employee e : v3) {
context.write(k3, e);
}
}
}
- 编写Main类
package com.li.mapreduce.salarytotalnew;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class PartEmployeeMain {
public static void main(String[] args) throws Exception {
//创建一个job
Job job = Job.getInstance(new Configuration());
job.setJarByClass(PartEmployeeMain.class);
//指定job的mapper和输出的类型k2 v2
job.setMapperClass(PartEmployeeMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Employee.class);
//指定任务的分区规划
job.setPartitionerClass(MyEmployeePartitioner.class);
// 指定建立几个分区
job.setNumReduceTasks(3);
// 指定job的reducer和输出的类型k4 v4
job.setReducerClass(PartEmployeeReducer.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Employee.class);
// 指定job的输入和输出的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[0]));
// 执行任务
job.waitForCompletion(true);
}
}