6.824 MapReduce 代码分析

之前在 VMware 里 Ubuntu,直接命令行 nano,现在后悔了,还是直接 Goland 好......

部分参考文章 / 视频👇

MIT 6.824 Lab 1 MapReduce详细实现思路及过程_mit 6.824 lab1-CSDN博客

mit6.824分布式lab1-MapReduce(1)_哔哩哔哩_bilibili

目录

🐸整体思路 

🍉协调者(coordinator.go)实现

1,初始化(MakeCoordinator)

2,处理任务请求(GetTask)

分析

c.cond.Wait() 解释

3,任务完成反馈(FinishedTask)

🍉工作者(worker.go)实现

1,map 任务执行(performMap)

2,reduce 任务执行(performReduce)

🍉rpc(rpc.go)实现

🚗面试

MapReduce 工作原理

MapReduce 如何保证数据一致性和完整性

MapReduce 优点

mapreduce 如何通信


🐸整体思路 

如何从 0 到 1 实现 Lab1 

因为函数中存在嵌套,所以需要多轮思路,比如第一轮定义了 Worker(),里面嵌套了 call(), performMap(),performReduce() 三个函数

那么第二轮就去实现它们

  • 先定义 rpc.go 的 API
    1,定义新类型 TaskType: Map, Reduce, Done 三个 TaskType 常量分别为 1,2,3

    2,定义 GetTaskReply:包含 TaskNum 任务编号,MapFile 输入文件,NReduceTasks reduce 任务数量,NMapTasks 输入的文件数量

    3,定义 FinishedTaskArgs:包含 TaskNum 任务编号
  • 再定义 coordinator.go 的 API
    1,(主节点初始化)定义 Coordinator:多线程并发访问,需要加锁 sync.Mutex;读入的文件 mapFiles;map 任务数量 nMapTasks;reduce 任务数量 nReduceTasks;已经被分配的 map, reduce 任务 mapTasksIssued && reduceTasksIssued;已经完成的 map, reduce 任务 mapTasksFinished && reduceTasksFinished;所有 reduce 任务完成 isDone
  • 然后是 worker.go 的 API
    1,(工作节点初始化)定义 Worker:建立 rpc 连接,并请求任务,根据当前 TaskType 类型,执行 map / reduce 任务。

    并在任务完成后通知 coordinator

  • 先定义 文件重命名 的 3 个函数
    1,根据 map && reduce 的 TaskNum 获取中间文件名

    2,原子重命名 中间文件

    3,原子重命名 最终文件

    4,ihash() 计算 reduce 任务编号

nMapTasks,读入的文件数量(取决于给了多少个pg-*.txt)

nReduceTasks,reduce 任务数量(10个,自己定)

每个 map 任务,产生 nReduceTasks 个中间文件(因为多少个 reduce 任务自己定的)

每个 reduce 任务,读取 nMapTasks 个中间文件(与当前编号的 reduce 任务对应的中间文件有 nMapTasks 个,这个数量也就是初始读入的文件数量)

进一步解释
假设读入 m 个文件,reduce 任务有 n 个,那么就有 m 个 map 任务,产生 n 个中间文件
一共产生 m*n 个中间文件,每个 reduce 任务读取 m 个中间文件

ihash() 函数确保键值对,平均分配到不同的 reduce 任务中

  • 先定义 worker.go 的 API
    1,(执行 map 任务)定义 performMap():读取文件;用 map() 转化为 键值对切片

    创建临时文件

    键值对写入临时文件;原子重命名
    map 阶段,具有相同键的值(或者说,根据键的哈希值,分配到对应编号的 reduce 任务中),经过 ihash() 会被写入同一个临时文件,最后也就是写入同一个中间文件

  • 2,(执行 reduce 任务)定义 performReduce():获取对应中间文件的键值对

    按键排序;创建临时文件

    统计相同键的数量,写入临时文件

    原子重命名

为什么 map 阶段不用排序呢,因为 map 阶段收集到的 key 只是子集,此时排序是没有意义的👇结合下图理解,重点是 Shuffle 阶段:每个 reduce 任务包含多个“单词”,也就是不同的键

  • 先定义 coordinator.go 的 API
    1,首先是 Done():所有 reduce 任务是否完成

    2,接着是 GetTask():分配 map 或 reduce 任务
    传递参数

    分配 map 任务

    分配 reduce 任务
    c.cond.Wait() 避免 worker 节点一直空闲 或 过度竞争任务,后续在 MakeCoordinator() 唤醒

    设置结果:reply 是 rpc 结构体,告诉 worker 节点任务已完成;c 是 Coordinator 结构体,告诉 coordinator 节点任务已完成

    3,下一步是 MakeCoordinator():创建 Coordinator 实例
    设置变量

    定期唤醒 goroutine

    监听 worker 请求

    4,最后是 FinishedTask():标记任务完成;唤醒等待的 worker 节点

  • 先定义 coordinator.go 的 API
    1,先是 server():启动 rpc 服务器,监听 worker 请求,以便 Coordinator 分配和管理 mapreduce 任务(server() 在 MakeCoordinator() 中被调用)
  • 再定义 rpc.go 的 API
    1,先是 coordinatorSock():基于当前用户 uid 的唯一文件名,用于进程间通信
  • 最后定义 worker.go 的 API
    1,先是 call()
    向 coordinator 发送请求并等待响应。
    call() 是 worker 节点发出的,server() 是 coordiantor 节点调用的。
    前者用于向主节点发送请求,后者用于启动服务器,监听 worker 的请求。

    解释
    call() 在 Worker() 中被调用(worker 节点通过 rpc 远程调用 coordinator 的函数)👇


    (1)coordinator 通过 rpc.register() 注册了自己
    (2)coordinator 暴露了 GetTask() 和 FinishedTask() 等方法

🍉协调者(coordinator.go)实现

以下内容,map 任务有一份,reduce 任务也有一份 

1,初始化(MakeCoordinator)

  • 协调者初始化时,会创建互斥锁(访问共享资源)和条件变量(同步任务状态)
  • 它接受输入的文件列表和 reduce 任务的数量,给每个 map 任务和 reduce 任务初始化 完成状态和分配时间
c.mapFiles = files // map 任务输入文件
c.nMapTasks = len(files) // map 任务数量
c.mapTaskFinished = make([]bool, len(files)) // map 任务完成状态
c.mapTaskIssued = make([]time.Time, len(files)) // map 任务分配时间

2,处理任务请求(GetTask)

分析

  • 工作节点调用 GetTask 请求新的任务
  • 协调者循环检查 map 任务和 reduce 任务的完成情况,以决定是否有足够的任务分配给请求的工作节点
for { // 循环检查未完成的任务
    mapDone := true
    // 遍历任务完成状态
    for m, done := range c.mapTaskFinished {
        if !done { // 任务未完成
            // 任务未分配 或 完成时间过长
            if c.mapTasksIssued[m].IsZero() 
            || time.Since(c.mapTasksIssued[m]).Seconds() > 10 {

                reply.TaskType = Map // 任务类型
                reply.TaskNum = m // 任务编号
                reply.MapFile = c.mapFiles[m] // map文件
                c.mapTasksIssued[m] = time.Now() // 新的分配时间
                return nil // 任务已分配    
            }
            else {
                mapDone = false // 任务已分配且未超时,即未完成
            }
        }
    }
    // 还有进行中的任务
    if !mapDone {
        c.cond.Wait() // 等待条件变量
    } else {
        break // 所有 map 任务已完成
    }
}

c.cond.Wait() 解释

  • 函数声明 👉 func (c *Coordinator) GetTask(......),c 就是 Coordinator 结构体
  • cond 声明在 Coordinator 中,cond 就是 *sync.Cond

代码中的 sync.Cond.Wait()👇

GetTask() 方法中,sync.Cond.Wait() 被用于实现 MapReduce 调度中的两个关键同步机制:

等待 map 任务完成等待 reduce 任务完成

  1. 等待 Map 任务完成

MapReduce 框架中,只有在所有 map 任务完成后,reduce 任务才会开始

GetTask() 方法的第一部分,通过循环检查所有 map 任务是否已经完成

如果存在未完成的 map 任务(未分配或者已超时),意味着可以继续分配新的 map 任务

此时 sync.Cond.Wait() 会被调用,当前协程挂起等待

  • 挂起等待

sync.Cond.Wait() 使得当前协程在不占用 CPU 资源的情况下等待

它释放与 sync.Cond 关联的互斥锁,让出 CPU 给其他协程,避免无效的忙等待

  • 条件变量

sync.Cond.Wait() 的使用依赖于一个条件变量

这个条件变量表示所有 map 任务已完成

当其他协程(执行 map 任务的 worker 节点)调用 FinishedTask() 方法

来标记某个 map 任务已完成时,它会调用 c.cond.Broadcast() 唤醒所有等待中的协程

Golang中如何正确使用条件变量sync.Cond (ieevee.com)

broadcast 用来唤醒 wait 的协程,为了避免死锁,broadcast 应该放在所有 wait 后

3,任务完成反馈(FinishedTask)

  • worker 节点完成任务后,会调用 FinishedTask() 通知 coordinator 节点
  • coordinator 节点收到通知后,更新任务完成状态,并 broadcast 唤醒正在等待任务的 worker 节点
switch args.TaskType { // 任务类型
case Map: // Map任务
    c.mapTaskFinished[args.TaskNum] = true // 标记完成
case Reduce: 
    c.reduceTasksFinished[args.TaskNum] = true
default: 
    log.Fatalf("Bad finished task? %s", args.TaskType) 
}

c.cond.Broadcast() // 唤醒所有等待的goroutine(worker节点)

🍉工作者(worker.go)实现

1,map 任务执行(performMap)

读取输入的小文件,应用 map 函数

file, err := os.Open(filename) // 打开文件
if err != nil {
    log.Fatalf("cannot open %v", filename)
}

content, err := ioutil.ReadAll(file) // 读取文件
if err != nil {
    log.Fatalf("cannot read %v", filename)
}
file.close()

kva := mapf(filename, string(content)) // map 函数,得到键值对集合

map 处理后的结果,通过哈希函数,将键值对分配给不同的 reduce 任务,并写入临时文件

for _, kv := range kva {
    r := ihash(kv.Key) % nReduceTasks // reduce 任务编号
    encoders[r].Encode(&kv) // 编码并写入对应临时文件
}

2,reduce 任务执行(performReduce)

shuffle 过程:worker 节点执行 reduce 任务时,会收集中间文件,并按键排序

利用 json 的序列化,将数据写入文件

kva := []KeyValue{}

for m := 0; m < nMapTasks; m++ {
    iFilename := getIntermediateFile(m, taskNum) // 获取中间文件名
    file, err := os.Open(iFilename) // 打开中间文件
    if err != nil {
        log.Fatalf("cannot open %v", iFilename)
    }
    defer file.Close()

    dec := json.NewDecoder(file) // 创建 JSON 解码器
    for {
        var kv KeyValue
        if err := dec.Decode(&kv); err != nil { // 读取键值对
            break // 读取完毕
        }
        kva = append(kva, kv) // 键值对添加到集合中
    }
}

// 按键排序
sort.Sort(ByKey(kva)) 

reduce 过程:对所有具有相同键的值应用 reduce 函数,同时将结果写入临时文件

// 对所有具有相同键的值应用 reduce 函数一次
keyBegin := 0
for keyBegin < len(kva) {
    keyEnd := keyBegin + 1
    for keyEnd < len(kva) && kva[keyEnd].Key == kva[keyBegin].Key {
        keyEnd++
    } // 循环找到所有具有相同键的值 - 因为键已经排序

    values := []string{}
    for k := keyBegin; k < keyEnd; k++ {
        values = append(values, kva[k].Value) // 收集相同键的所有值
    }
    
    // 应用 reduce 函数
    output := reducef(kva[keyBegin].Key, values)

    // 输出写入 reduce 任务临时文件
    fmt.Fprintf(tmpFile, "%v %v\n", kva[keyBegin].Key, output) // 格式化写入

    // 转到下一个键
    keyBegin = keyEnd
}

🍉rpc(rpc.go)实现

定义 rpc 的数据结构

type TaskType int

const (
    Map  TaskType = 1 // map 任务类型
    Reduce  TaskType = 2
    Done  TaskType = 3 // 作业完成状态
)

// 请求任务
type GetTaskArgs struct{}

// 获取任务
type GetTaskReply struct {
    TaskType TaskType // 任务类型:map 或 reduce
    TaskNum int // 编号
    NReduceTasks int // map 输出的文件数量
    MapFile string // 输入文件名
    NMapTasks int // reduce 输入的中间文件数量
}

coordinator 的 套接字文件名,确保每次运行的文件名都是唯一的

func coordinatorSock() string {
    s := "var/tmp/824-mr-" // 套接字文件名前缀
    s += strconv.Itoa(os.Getuid()) // 当前用户的 UID 附加到文件名后
    return s
}

🚗面试

MapReduce 工作原理

  • 将大规模数据处理,分解为两个阶段:map(映射)和 reduce(归约)
  • map 阶段,输入数据被分割为多个独立的片段(大文件拆分成小文件),每个片段由一个 worker 节点处理,生成一组中间键值对
  • 中间键值对,通过 Shuffle(map 结束后,reduce 开始之前),作为 reduce 任务的输入
  • reduce 阶段对相同键的所有值,进行聚合操作,输出最终结果

代码上 

①Map 任务由 performMap() 实现,读取输入文件,应用 mapf() 函数,结果写入临时文件②Reduce 任务由 performReduce()实现,读取中间文件,按键排序,应用 reducef() 函数,结果写入最终文件 

关于 Shuffle👇

  1. Map(映射)的输出
    ①map 阶段处理输入文件,生成中间结果的键值对,键值对先在内存中累积
  2. Spill(溢写)
    ①当 map 阶段的内存缓冲区达到阈值,数据会从内存溢写(spill)到磁盘,形成 spill 文件
    ②溢写之前,数据会根据 key 排序
  3. Merge(合并)
    ①当 map 阶段所有数据处理完后,会有多个磁盘上的 spill 文件
    ②为了减少磁盘 I/O 操作,这些 spill 文件会合并成一个大文件,并且再次排序,合并,以减少传输给 reduce 任务的数据量
  4. Copy(拷贝)
    ①在 reduce 阶段开始后,处理 reduce 任务的 worker 节点,会从处理 map 任务的节点拷贝所需的排序后的数据
  5. Merge Sort(归并排序)
    ①reduce 阶段拷贝数据的同时,会进行归并排序,确保所有来自 map 阶段的数据在 reduce 阶段是有序的 

MapReduce 如何保证数据一致性和完整性

结合代码分析 

  • 原子提交

mapreduce 保证每个任务的输出(map 或 reduce),在最终提交前,都是原子性的。

因为在任务完成时,它的输出会一次性写入文件系统

worker.go 中,performMap() 和 performReduce() 两个函数,输出结果都是先写入临时文件,接着才用 finalizeIntermediateFile() 和 finalizeReduceFile() 函数原子地重命名这些文件

// 原子重命名:临时文件 --> 中间文件
finalizeIntermediateFile(tmpFilenames[r], taskNum, r)

// 原子重命名:reduce文件 --> 最终文件
finalizeReduceFile(tmpFilename, taskNum)
  •  故障恢复

coordinator.go 中,GetTask() 函数确保 map 任务在超时 10s 后重新分配,有助于处理节点崩溃的问题

// 如果任务从未分配过,或者自从分配以来时间太长,可能worker已崩溃,需要重新分配
if c.mapTasksIssued[m].IsZero() || time.Since(c.mapTasksIssued[m]).Seconds() > 10 {
    reply.TaskType = Map // 设置任务类型为Map
    reply.TaskNum = m // 设置任务编号
    reply.MapFile = c.mapFiles[m] // 设置map文件
    c.mapTasksIssued[m] = time.Now() // 更新任务分配时间
    return nil // 返回nil表示没有错误
}

MapReduce 优点

优点

  • 接口简单

mapreduce 提供了 map 和 reduce 两个核心函数,只需实现这两个函数

  • 良好的扩展性

处理大规模数据时,可以增加更多的计算节点,通过多节点并行处理,提高速度

  • 高容错

分布式系统中,节点故障很常见。

mapreduce在不同节点上存储了多个数据副本,如果任务失败,会自动重新调度

  • 处理大规模数据

大规模数据,如日志分析,网页索引。

比如 Google 每天需要处理 5 个pb的数据,使用 mapreduce 可以有效地进行索引和分析

mapreduce 如何通信

通信机制

  • RPC 调用
    使用 go 语言的 net/rpc 包实现,worker 节点和 coordinator 节点之间通过 rpc 通信
  • 锁和条件变量
    coordinator 中使用互斥锁和条件变量,来同步对任务状态的访问和更新
  • HTTP 服务
    coordinator 通过 HTTP 服务监听来自 worker 节点的 RPC 请求

定义 RPC 接口

  • rpc.go 文件中,我们定义了 RPC 通信需要用到的接口和数据结构。这包括任务类型(Map、Reduce、Done),以及用于任务请求和任务完成通知的结构体
type TaskType int

const (
    Map  TaskType = 1
    Reduce  TaskType = 2
    Done  TaskType = 3
)

type GetTaskArgs struct{}
type GetTaskReply struct {
    TaskType TaskType
    TaskNum int
    NReduceTasks int
    MapFile string
    NMapTasks int
}

type FinishedTaskArgs struct {
    TaskType TaskType
    TaskNum int
}
type FinishedTaskReply struct{}

启动协调器的 RPC 服务器

  • coordinator.go 文件中,协调器启动了一个 RPC 服务器来监听工作节点的请求。这个服务器是通过注册协调器对象和处理 HTTP 请求来实现的
func (c *Coordinator) server() {
    rpc.Register(c)
    rpc.HandleHTTP()
    sockname := coordinatorSock()
    os.Remove(sockname)
    l, e := net.Listen("unix", sockname)
    if e != nil {
        log.Fatal("listen error:", e)
    }
    go http.Serve(l, nil)
}

worker 节点请求任务

  • worker.go 文件中,工作节点通过调用 call 函数向协调器发送 RPC 请求,获取分配的任务。这个函数会根据回复中的信息执行相应的 Map 或 Reduce 任务
for {
    args := GetTaskArgs{}
    reply := GetTaskReply{}

    call("Coordinator.GetTask", &args, &reply)

    switch reply.TaskType {
    case Map:
        performMap(reply.MapFile, reply.TaskNum, reply.NReduceTasks, mapf)
    case Reduce:
        performReduce(reply.TaskNum, reply.NMapTasks, reducef)
    case Done:
        os.Exit(0)
    default:
        fmt.Errorf("Bad task type? %s", reply.TaskType)
    }

    finArgs := FinishedTaskArgs{
        TaskType: reply.TaskType,
        TaskNum:  reply.TaskNum,
    }
    finReply := FinishedTaskReply{}
    call("Coordinator.FinishedTask", &finArgs, &finReply)
}

协调器完成任务处理

  • 当工作节点完成一个任务后,它会通过调用 call 函数通知协调器。协调器接收到这个通知后,会更新任务状态,并可能唤醒其他等待任务的工作节点
func (c *Coordinator) FinishedTask(args *FinishedTaskArgs, reply *FinishedTaskReply) error {
    c.mu.Lock()
    defer c.mu.Unlock()

    switch args.TaskType {
    case Map:
        c.mapTasksFinished[args.TaskNum] = true
    case Reduce:
        c.reduceTasksFinished[args.TaskNum] = true
    default:
        log.Fatalf("Bad finished task? %s", args.TaskType)
    }

    c.cond.Broadcast()
    return nil
}
评论 1
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

千帐灯无此声

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值