之前在 VMware 里 Ubuntu,直接命令行 nano,现在后悔了,还是直接 Goland 好......
部分参考文章 / 视频👇
目录
🐸整体思路
如何从 0 到 1 实现 Lab1
因为函数中存在嵌套,所以需要多轮思路,比如第一轮定义了 Worker(),里面嵌套了 call(), performMap(),performReduce() 三个函数
那么第二轮就去实现它们
①
- 先定义 rpc.go 的 API
1,定义新类型 TaskType: Map, Reduce, Done 三个 TaskType 常量分别为 1,2,3
2,定义 GetTaskReply:包含 TaskNum 任务编号,MapFile 输入文件,NReduceTasks reduce 任务数量,NMapTasks 输入的文件数量
3,定义 FinishedTaskArgs:包含 TaskNum 任务编号
- 再定义 coordinator.go 的 API
1,(主节点初始化)定义 Coordinator:多线程并发访问,需要加锁 sync.Mutex;读入的文件 mapFiles;map 任务数量 nMapTasks;reduce 任务数量 nReduceTasks;已经被分配的 map, reduce 任务 mapTasksIssued && reduceTasksIssued;已经完成的 map, reduce 任务 mapTasksFinished && reduceTasksFinished;所有 reduce 任务完成 isDone
- 然后是 worker.go 的 API
1,(工作节点初始化)定义 Worker:建立 rpc 连接,并请求任务,根据当前 TaskType 类型,执行 map / reduce 任务。
并在任务完成后通知 coordinator
②
- 先定义 文件重命名 的 3 个函数
1,根据 map && reduce 的 TaskNum 获取中间文件名
2,原子重命名 中间文件
3,原子重命名 最终文件
4,ihash() 计算 reduce 任务编号
③
nMapTasks,读入的文件数量(取决于给了多少个pg-*.txt)
nReduceTasks,reduce 任务数量(10个,自己定)
每个 map 任务,产生 nReduceTasks 个中间文件(因为多少个 reduce 任务自己定的)
每个 reduce 任务,读取 nMapTasks 个中间文件(与当前编号的 reduce 任务对应的中间文件有 nMapTasks 个,这个数量也就是初始读入的文件数量)
进一步解释
假设读入 m 个文件,reduce 任务有 n 个,那么就有 m 个 map 任务,产生 n 个中间文件
一共产生 m*n 个中间文件,每个 reduce 任务读取 m 个中间文件
ihash() 函数确保键值对,平均分配到不同的 reduce 任务中
- 先定义 worker.go 的 API
1,(执行 map 任务)定义 performMap():读取文件;用 map() 转化为 键值对切片
创建临时文件
键值对写入临时文件;原子重命名
map 阶段,具有相同键的值(或者说,根据键的哈希值,分配到对应编号的 reduce 任务中),经过 ihash() 会被写入同一个临时文件,最后也就是写入同一个中间文件
2,(执行 reduce 任务)定义 performReduce():获取对应中间文件的键值对
按键排序;创建临时文件
统计相同键的数量,写入临时文件
原子重命名
为什么 map 阶段不用排序呢,因为 map 阶段收集到的 key 只是子集,此时排序是没有意义的👇结合下图理解,重点是 Shuffle 阶段:每个 reduce 任务包含多个“单词”,也就是不同的键
④
- 先定义 coordinator.go 的 API
1,首先是 Done():所有 reduce 任务是否完成
2,接着是 GetTask():分配 map 或 reduce 任务
传递参数
分配 map 任务
分配 reduce 任务
c.cond.Wait() 避免 worker 节点一直空闲 或 过度竞争任务,后续在 MakeCoordinator() 唤醒
设置结果:reply 是 rpc 结构体,告诉 worker 节点任务已完成;c 是 Coordinator 结构体,告诉 coordinator 节点任务已完成
3,下一步是 MakeCoordinator():创建 Coordinator 实例
设置变量
定期唤醒 goroutine
监听 worker 请求
4,最后是 FinishedTask():标记任务完成;唤醒等待的 worker 节点
⑤
- 先定义 coordinator.go 的 API
1,先是 server():启动 rpc 服务器,监听 worker 请求,以便 Coordinator 分配和管理 mapreduce 任务(server() 在 MakeCoordinator() 中被调用)- 再定义 rpc.go 的 API
1,先是 coordinatorSock():基于当前用户 uid 的唯一文件名,用于进程间通信- 最后定义 worker.go 的 API
1,先是 call():
向 coordinator 发送请求并等待响应。
call() 是 worker 节点发出的,server() 是 coordiantor 节点调用的。
前者用于向主节点发送请求,后者用于启动服务器,监听 worker 的请求。
解释
call() 在 Worker() 中被调用(worker 节点通过 rpc 远程调用 coordinator 的函数)👇
(1)coordinator 通过 rpc.register() 注册了自己
(2)coordinator 暴露了 GetTask() 和 FinishedTask() 等方法
🍉协调者(coordinator.go)实现
以下内容,map 任务有一份,reduce 任务也有一份
1,初始化(MakeCoordinator)
- 协调者初始化时,会创建互斥锁(访问共享资源)和条件变量(同步任务状态)
- 它接受输入的文件列表和 reduce 任务的数量,给每个 map 任务和 reduce 任务初始化 完成状态和分配时间
c.mapFiles = files // map 任务输入文件
c.nMapTasks = len(files) // map 任务数量
c.mapTaskFinished = make([]bool, len(files)) // map 任务完成状态
c.mapTaskIssued = make([]time.Time, len(files)) // map 任务分配时间
2,处理任务请求(GetTask)
分析
- 工作节点调用 GetTask 请求新的任务
- 协调者循环检查 map 任务和 reduce 任务的完成情况,以决定是否有足够的任务分配给请求的工作节点
for { // 循环检查未完成的任务
mapDone := true
// 遍历任务完成状态
for m, done := range c.mapTaskFinished {
if !done { // 任务未完成
// 任务未分配 或 完成时间过长
if c.mapTasksIssued[m].IsZero()
|| time.Since(c.mapTasksIssued[m]).Seconds() > 10 {
reply.TaskType = Map // 任务类型
reply.TaskNum = m // 任务编号
reply.MapFile = c.mapFiles[m] // map文件
c.mapTasksIssued[m] = time.Now() // 新的分配时间
return nil // 任务已分配
}
else {
mapDone = false // 任务已分配且未超时,即未完成
}
}
}
// 还有进行中的任务
if !mapDone {
c.cond.Wait() // 等待条件变量
} else {
break // 所有 map 任务已完成
}
}
c.cond.Wait() 解释
- 函数声明 👉 func (c *Coordinator) GetTask(......),c 就是 Coordinator 结构体
- cond 声明在 Coordinator 中,cond 就是 *sync.Cond
代码中的 sync.Cond.Wait()👇
GetTask() 方法中,sync.Cond.Wait() 被用于实现 MapReduce 调度中的两个关键同步机制:
等待 map 任务完成 和 等待 reduce 任务完成
- 等待 Map 任务完成
MapReduce 框架中,只有在所有 map 任务完成后,reduce 任务才会开始
GetTask() 方法的第一部分,通过循环检查所有 map 任务是否已经完成
如果存在未完成的 map 任务(未分配或者已超时),意味着可以继续分配新的 map 任务
此时 sync.Cond.Wait() 会被调用,当前协程挂起等待
- 挂起等待
sync.Cond.Wait() 使得当前协程在不占用 CPU 资源的情况下等待
它释放与 sync.Cond 关联的互斥锁,让出 CPU 给其他协程,避免无效的忙等待
- 条件变量
sync.Cond.Wait() 的使用依赖于一个条件变量
这个条件变量表示所有 map 任务已完成
当其他协程(执行 map 任务的 worker 节点)调用 FinishedTask() 方法
来标记某个 map 任务已完成时,它会调用 c.cond.Broadcast() 唤醒所有等待中的协程
Golang中如何正确使用条件变量sync.Cond (ieevee.com)
broadcast 用来唤醒 wait 的协程,为了避免死锁,broadcast 应该放在所有 wait 后
3,任务完成反馈(FinishedTask)
- worker 节点完成任务后,会调用 FinishedTask() 通知 coordinator 节点
- coordinator 节点收到通知后,更新任务完成状态,并 broadcast 唤醒正在等待任务的 worker 节点
switch args.TaskType { // 任务类型
case Map: // Map任务
c.mapTaskFinished[args.TaskNum] = true // 标记完成
case Reduce:
c.reduceTasksFinished[args.TaskNum] = true
default:
log.Fatalf("Bad finished task? %s", args.TaskType)
}
c.cond.Broadcast() // 唤醒所有等待的goroutine(worker节点)
🍉工作者(worker.go)实现
1,map 任务执行(performMap)
读取输入的小文件,应用 map 函数
file, err := os.Open(filename) // 打开文件
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file) // 读取文件
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.close()
kva := mapf(filename, string(content)) // map 函数,得到键值对集合
map 处理后的结果,通过哈希函数,将键值对分配给不同的 reduce 任务,并写入临时文件
for _, kv := range kva {
r := ihash(kv.Key) % nReduceTasks // reduce 任务编号
encoders[r].Encode(&kv) // 编码并写入对应临时文件
}
2,reduce 任务执行(performReduce)
shuffle 过程:worker 节点执行 reduce 任务时,会收集中间文件,并按键排序
利用 json 的序列化,将数据写入文件
kva := []KeyValue{}
for m := 0; m < nMapTasks; m++ {
iFilename := getIntermediateFile(m, taskNum) // 获取中间文件名
file, err := os.Open(iFilename) // 打开中间文件
if err != nil {
log.Fatalf("cannot open %v", iFilename)
}
defer file.Close()
dec := json.NewDecoder(file) // 创建 JSON 解码器
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil { // 读取键值对
break // 读取完毕
}
kva = append(kva, kv) // 键值对添加到集合中
}
}
// 按键排序
sort.Sort(ByKey(kva))
reduce 过程:对所有具有相同键的值应用 reduce 函数,同时将结果写入临时文件
// 对所有具有相同键的值应用 reduce 函数一次
keyBegin := 0
for keyBegin < len(kva) {
keyEnd := keyBegin + 1
for keyEnd < len(kva) && kva[keyEnd].Key == kva[keyBegin].Key {
keyEnd++
} // 循环找到所有具有相同键的值 - 因为键已经排序
values := []string{}
for k := keyBegin; k < keyEnd; k++ {
values = append(values, kva[k].Value) // 收集相同键的所有值
}
// 应用 reduce 函数
output := reducef(kva[keyBegin].Key, values)
// 输出写入 reduce 任务临时文件
fmt.Fprintf(tmpFile, "%v %v\n", kva[keyBegin].Key, output) // 格式化写入
// 转到下一个键
keyBegin = keyEnd
}
🍉rpc(rpc.go)实现
定义 rpc 的数据结构
type TaskType int
const (
Map TaskType = 1 // map 任务类型
Reduce TaskType = 2
Done TaskType = 3 // 作业完成状态
)
// 请求任务
type GetTaskArgs struct{}
// 获取任务
type GetTaskReply struct {
TaskType TaskType // 任务类型:map 或 reduce
TaskNum int // 编号
NReduceTasks int // map 输出的文件数量
MapFile string // 输入文件名
NMapTasks int // reduce 输入的中间文件数量
}
coordinator 的 套接字文件名,确保每次运行的文件名都是唯一的
func coordinatorSock() string {
s := "var/tmp/824-mr-" // 套接字文件名前缀
s += strconv.Itoa(os.Getuid()) // 当前用户的 UID 附加到文件名后
return s
}
🚗面试
MapReduce 工作原理
- 将大规模数据处理,分解为两个阶段:map(映射)和 reduce(归约)
- map 阶段,输入数据被分割为多个独立的片段(大文件拆分成小文件),每个片段由一个 worker 节点处理,生成一组中间键值对
- 中间键值对,通过 Shuffle(map 结束后,reduce 开始之前),作为 reduce 任务的输入
- reduce 阶段对相同键的所有值,进行聚合操作,输出最终结果
代码上
①Map 任务由 performMap() 实现,读取输入文件,应用 mapf() 函数,结果写入临时文件②Reduce 任务由 performReduce()实现,读取中间文件,按键排序,应用 reducef() 函数,结果写入最终文件
关于 Shuffle👇
- Map(映射)的输出
①map 阶段处理输入文件,生成中间结果的键值对,键值对先在内存中累积- Spill(溢写)
①当 map 阶段的内存缓冲区达到阈值,数据会从内存溢写(spill)到磁盘,形成 spill 文件
②溢写之前,数据会根据 key 排序- Merge(合并)
①当 map 阶段所有数据处理完后,会有多个磁盘上的 spill 文件
②为了减少磁盘 I/O 操作,这些 spill 文件会合并成一个大文件,并且再次排序,合并,以减少传输给 reduce 任务的数据量- Copy(拷贝)
①在 reduce 阶段开始后,处理 reduce 任务的 worker 节点,会从处理 map 任务的节点拷贝所需的排序后的数据- Merge Sort(归并排序)
①reduce 阶段拷贝数据的同时,会进行归并排序,确保所有来自 map 阶段的数据在 reduce 阶段是有序的
MapReduce 如何保证数据一致性和完整性
结合代码分析
- 原子提交
mapreduce 保证每个任务的输出(map 或 reduce),在最终提交前,都是原子性的。
因为在任务完成时,它的输出会一次性写入文件系统
worker.go 中,performMap() 和 performReduce() 两个函数,输出结果都是先写入临时文件,接着才用 finalizeIntermediateFile() 和 finalizeReduceFile() 函数原子地重命名这些文件
// 原子重命名:临时文件 --> 中间文件
finalizeIntermediateFile(tmpFilenames[r], taskNum, r)
// 原子重命名:reduce文件 --> 最终文件
finalizeReduceFile(tmpFilename, taskNum)
- 故障恢复
coordinator.go 中,GetTask() 函数确保 map 任务在超时 10s 后重新分配,有助于处理节点崩溃的问题
// 如果任务从未分配过,或者自从分配以来时间太长,可能worker已崩溃,需要重新分配
if c.mapTasksIssued[m].IsZero() || time.Since(c.mapTasksIssued[m]).Seconds() > 10 {
reply.TaskType = Map // 设置任务类型为Map
reply.TaskNum = m // 设置任务编号
reply.MapFile = c.mapFiles[m] // 设置map文件
c.mapTasksIssued[m] = time.Now() // 更新任务分配时间
return nil // 返回nil表示没有错误
}
MapReduce 优点
优点
- 接口简单
mapreduce 提供了 map 和 reduce 两个核心函数,只需实现这两个函数
- 良好的扩展性
处理大规模数据时,可以增加更多的计算节点,通过多节点并行处理,提高速度
- 高容错
分布式系统中,节点故障很常见。
mapreduce在不同节点上存储了多个数据副本,如果任务失败,会自动重新调度
- 处理大规模数据
大规模数据,如日志分析,网页索引。
比如 Google 每天需要处理 5 个pb的数据,使用 mapreduce 可以有效地进行索引和分析
mapreduce 如何通信
通信机制
- RPC 调用
使用 go 语言的 net/rpc 包实现,worker 节点和 coordinator 节点之间通过 rpc 通信- 锁和条件变量
coordinator 中使用互斥锁和条件变量,来同步对任务状态的访问和更新- HTTP 服务
coordinator 通过 HTTP 服务监听来自 worker 节点的 RPC 请求
定义 RPC 接口
- 在
rpc.go
文件中,我们定义了 RPC 通信需要用到的接口和数据结构。这包括任务类型(Map、Reduce、Done),以及用于任务请求和任务完成通知的结构体
type TaskType int
const (
Map TaskType = 1
Reduce TaskType = 2
Done TaskType = 3
)
type GetTaskArgs struct{}
type GetTaskReply struct {
TaskType TaskType
TaskNum int
NReduceTasks int
MapFile string
NMapTasks int
}
type FinishedTaskArgs struct {
TaskType TaskType
TaskNum int
}
type FinishedTaskReply struct{}
启动协调器的 RPC 服务器
- 在
coordinator.go
文件中,协调器启动了一个 RPC 服务器来监听工作节点的请求。这个服务器是通过注册协调器对象和处理 HTTP 请求来实现的
func (c *Coordinator) server() {
rpc.Register(c)
rpc.HandleHTTP()
sockname := coordinatorSock()
os.Remove(sockname)
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
}
worker 节点请求任务
- 在
worker.go
文件中,工作节点通过调用call
函数向协调器发送 RPC 请求,获取分配的任务。这个函数会根据回复中的信息执行相应的 Map 或 Reduce 任务
for {
args := GetTaskArgs{}
reply := GetTaskReply{}
call("Coordinator.GetTask", &args, &reply)
switch reply.TaskType {
case Map:
performMap(reply.MapFile, reply.TaskNum, reply.NReduceTasks, mapf)
case Reduce:
performReduce(reply.TaskNum, reply.NMapTasks, reducef)
case Done:
os.Exit(0)
default:
fmt.Errorf("Bad task type? %s", reply.TaskType)
}
finArgs := FinishedTaskArgs{
TaskType: reply.TaskType,
TaskNum: reply.TaskNum,
}
finReply := FinishedTaskReply{}
call("Coordinator.FinishedTask", &finArgs, &finReply)
}
协调器完成任务处理
- 当工作节点完成一个任务后,它会通过调用
call
函数通知协调器。协调器接收到这个通知后,会更新任务状态,并可能唤醒其他等待任务的工作节点
func (c *Coordinator) FinishedTask(args *FinishedTaskArgs, reply *FinishedTaskReply) error {
c.mu.Lock()
defer c.mu.Unlock()
switch args.TaskType {
case Map:
c.mapTasksFinished[args.TaskNum] = true
case Reduce:
c.reduceTasksFinished[args.TaskNum] = true
default:
log.Fatalf("Bad finished task? %s", args.TaskType)
}
c.cond.Broadcast()
return nil
}