👂 若月亮没来 (若是月亮还没来)(若是月亮还没来) - 王宇宙Leto/乔浚丞 - 单曲 - 网易云音乐
目录
🌼参考代码
🐙解析
实验原文要求仔细研读两份代码,并在作业过程中大胆借鉴
wc.go
- MapReduce 的插件,实现了 Map 和 Reduce 两个函数
- Map 函数接收输入文本的内容,分割成单词,并为每个单词生成一个键值对(键是单词,值是1)
- Reduce 函数接收 Map 生成的所有键值对,统计每个单词出现次数,并返回这个次数
mrsequential.go
- MapReduce 的主体,协调 Map 和 Reduce 任务的执行
- 检查命令行参数
(
os.Args[0] // 可执行文件
os.Args[1] // "wc.so"(插件)
os.Args[2] // "pg1.txt"(输入文件)
os.Args[3] // "pg2.txt"
)- 加载 wc.go,执行其中的 Map 和 Reduce 函数
- 读取输入文件内容,对每个文件调用 Map 函数
- Map 函数的输出按键 排序(输出单词)
- 排序的键值对进行 Reduce(归约)
- Reduce 函数的输出写入输出文件(统计次数)
🐟mrsequential.go
代码是单线程的,输入 --> Map --> sort --> Reduce --> 输出
package main
import (
"fmt"
"6.824/mr" // 引入MapReduce相关的数据结构和接口
"plugin" // 用于动态加载插件
"os" // 用于操作系统相关的功能,如命令行参数
"log" // 用于日志记录
"io/ioutil" // 用于I/O操作,如读取文件
"sort" // 用于排序
)
// ByKey 是一个用于按键排序的切片类型
type ByKey []mr.KeyValue
// Len 实现了 sort.Interface 接口的 Len 方法
func (a ByKey) Len() int { return len(a) }
// Swap 实现了 sort.Interface 接口的 Swap 方法
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// Less 实现了 sort.Interface 接口的 Less 方法
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
func main() {
// 检查命令行参数数量
if len(os.Args) < 3 {
fmt.Fprintf(os.Stderr, "Usage: mrsequential xxx.so inputfiles...\n")
os.Exit(1)
}
// 加载插件中的 Map 和 Reduce 函数
mapf, reducef := loadPlugin(os.Args[1])
// 用于存储Map阶段的中间输出
intermediate := []mr.KeyValue{}
// 遍历所有输入文件
for _, filename := range os.Args[2:] {
// 打开文件
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
// 读取文件全部内容
content, err := ioutil.ReadAll(file)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
// 调用 Map 函数处理文件内容
kv := mapf(filename, string(content))
// 将Map结果添加到中间输出
intermediate = append(intermediate, kv...)
}
// 对中间输出按键排序
sort.Sort(ByKey(intermediate))
// 创建输出文件
oname := "mr-out-0"
ofile, _ := os.Create(oname)
// 调用 Reduce 函数并写入输出文件
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
// 收集相同键的所有值
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
// 调用 Reduce 函数
output := reducef(intermediate[i].Key, values)
// 按格式写入输出文件
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
i = j
}
// 关闭输出文件
ofile.Close()
}
// loadPlugin 从插件文件中加载 Map 和 Reduce 函数
func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
// 打开插件
p, err := plugin.Open(filename)
if err != nil {
log.Fatalf("cannot load plugin %v", filename)
}
// 查找 Map 函数
xmapf, err := p.Lookup("Map")
if err != nil {
log.Fatalf("cannot find Map in %v", filename)
}
mapf := xmapf.(func(string, string) []mr.KeyValue)
// 查找 Reduce 函数
xreducef, err := p.Lookup("Reduce")
if err != nil {
log.Fatalf("cannot find Reduce in %v", filename)
}
reducef := xreducef.(func(string, []string) string)
// 返回 Map 和 Reduce 函数
return mapf, reducef
}
🐟mrapps/wc.go
Map 函数返回键值对切片,Reduce 函数将单词出现次数转化为字符串后返回
// 定义包名为 main,这是一个插件,可以被 MapReduce 框架动态加载。
package main
// 导入 MapReduce 框架的包,用于实现 Map 和 Reduce 函数。
import "6.824/mr"
// 导入 unicode 包,用于判断字符是否为字母。
import "unicode"
// 导入 strings 包,用于字符串操作。
import "strings"
// 导入 strconv 包,用于字符串和基本数据类型之间的转换。
import "strconv"
// Map 函数是 MapReduce 框架中的第一个阶段,它将对输入文件的每一行调用一次。
// 参数 filename 是输入文件的名称,contents 是文件的全部内容。
// 这个函数返回一系列键值对,其中键是单词,值是 "1"。
func Map(filename string, contents string) []mr.KeyValue {
// FieldsFunc 函数将根据 ff 函数来分割字符串。
// ff 函数是一个过滤函数,它返回 true 如果字符不是字母。
ff := func(r rune) bool { return !unicode.IsLetter(r) }
// 使用 FieldsFunc 函数根据 ff 函数分割 contents 字符串,得到单词数组。
words := strings.FieldsFunc(contents, ff)
// 初始化一个空的键值对切片,用于存储 Map 函数的输出。
kv := []mr.KeyValue{}
for _, w := range words {
// 对于每个单词 w,创建一个键值对,键是单词本身,值是 "1"。
kv = append(kv, mr.KeyValue{w, "1"})
}
// 返回包含所有单词和计数的键值对切片。
return kv
}
// Reduce 函数是 MapReduce 框架中的第二个阶段,它对每个唯一的键调用一次。
// 参数 key 是键,values 是所有映射任务为该键生成的值的列表。
// 这个函数返回一个字符串,表示键出现的次数。
func Reduce(key string, values []string) string {
// 使用 strconv.Itoa 函数将 values 切片的长度(即 key 出现的次数)转换为字符串。
return strconv.Itoa(len(values))
}
📕实验--准备
🎂概念
所谓“单机”,整个项目部署在一台机器上
所谓“集群”,集群中的每一个节点就是一个单机,每个单机运行同一个的项目,通过设置“调度者”,用户请求先发送到“调度者”,再由“调度者”根据所有节点的负载情况,分配任务,即负载均衡
(从单机到集群,代码无需修改,只需多部署几台服务器)
所谓 “分布式”,类似流水线(只是将串行改成了并行),每条线负责不同的功能,最终将一个个小功能,整合成一个项目
(也就是将原本部署在单机上的系统,拆分成一个个子系统,每个子系统都是独立的)
这些子系统存在依赖关系,在网络中通过 rpc(remote procedure call) 通信
🐋思路梳理
wc.go 是一个实现了 Map 和 Reduce 函数的插件
而 mrsequential.go 是 MapReduce 的顺序实现
(可以理解为“单机”实现,一台机器,单个进程,顺序执行)
我们要做的就是,将 mrsequential.go 拆分成 5 个文件,实现 MapReduce(词频统计) 的分布式部署 / 并行执行
main(程序入口)
- main/mrcoordinator.go 协调者初始化
- main/mrworker.go 工作者初始化
两个 main 文件不用修改,我们只需完成以下 3 个 mr/.... 文件即可
mr(具体实现)
- mr/coordinator.go 实现协调者(监视 worker,分配任务,处理失败,重新分配)
- mr/worker.go 实现工作者(请求任务,执行 Map,执行 Reduce,写入中间结果,写入最终结果)
- mr/rpc.go 协调者 与 工作者 间的远程调用 (定义了通信接口和数据结构)
🦖注意要点
- 修改 mr/ 下任何文件后,需要重新构建插件 wc.go,确保插件不依赖旧版本
go build -race -buildmode=plugin ../mrapps/wc.go
- 修改
mr/worker.go
中的Worker()
函数,通过 RPC 请求 coordinator 分配任务- 中间文件命名 mr-X-Y(X 为 Map 任务编号,Y 为 Reduce 任务编号)
- 使用 Go 的
encoding/json
包写入和读取 JSON 文件enc := json.NewEncoder(file) // json.Encoder实例,编码为 json 格式 for _, kv := ... { err := enc.Encode(&kv)
dec := json.NewDecoder(file) // json.Decoder 实例,解码为 json 格式 for { var kv KeyValue if err := dec.Decode(&kv); err != nil { break } kva = append(kva, kv) }
- 使用
mrapps/crash.go
插件测试崩溃恢复go build -race -buildmode=plugin crash.go // 编译插件文件 go run -race mrcoordinator.go pg-*.txt // 根据输入文件,启动 MapReduce 作业 go run -race mrworker.go crash.so // 运行 worker 进程,使用插件故意崩溃
- 为防止崩溃时部分写入,用
ioutil.TempFile
创建临时文件,os.Rename
原子地重命名
🐆初始代码--研读
初始代码可以先抄一遍,理解一下,捋清楚思路后,再开始做
main/mrcoordinator.go
创建协调者,通过命令行参数,传递输入文件给工作者,并在作业完成后退出程序
// 程序入口点
package main
// 引入 MapReduce 包,包含协调者和工作者的实现
import "6.824/mr"
// time 包,暂停时间
import "time"
// 引入 os 包,读取命令行参数。
import "os"
// 格式化输入输出
import "fmt"
func main() {
// 访问命令行参数,至少读取一个文件,第一个参数是程序名本身
if len(os.Args) < 2 {
// 如果参数数量小于2,打印到标准错误
fmt.Fprintf(os.Stderr, "Usage: mrcoordinator inputfiles...\n")
os.Exit(1)
}
// 创建协调者实例,除了程序名,剩下的参数作为输入文件传递
m := mr.MakeCoordinator(os.Args[1:], 10) // 10 个工作者
// 循环直到 MapReduce 作业完成
for m.Done() == false { // m.Done() 检查 mr 作业是否完成
// time.Sleep 暂停一秒
time.Sleep(time.Second)
}
// 作业完成后,再等待一秒钟,可能是为了确保所有输出都已经写入
time.Sleep(time.Second)
}
main/mrworker.go
从命令行参数中,获取插件文件,并将插件文件中的 Map 和 Reduce 函数,转化为具体函数类型(便于后续调用)
因为接口类型本身,不能直接被调用,需要转化为具体类型
package main
import (
"6.824/mr"
"plugin"
"os"
"fmt"
"log"
)
// main 是程序的入口点,当程序启动时最先执行的函数
func main() {
// 参数1:程序名 参数2:插件文件路径
if len(os.Args) != 2 {
// 写入标准错误流
fmt.Fprintf(os.Stderr, "Usage: mrworker xxx.so\n")
// 终止程序,并返回状态码 1 表示错误
os.Exit(1)
}
// 调用 loadPlugin 函数加载 Map 和 Reduce 函数
// Map, Reduce 函数,都来自于插件文件
// mapf 是 Map 函数,reducef 是 Reduce 函数
mapf, reducef := loadPlugin(os.Args[1])
// 调用 mr.Worker 启动 MapReduce 工作者进程,传入加载的 Map 和 Reduce 函数
mr.Worker(mapf, reducef)
}
// loadPlugin 函数用于从插件文件中加载 Map 和 Reduce 函数
// filename 插件文件的路径
func loadPlugin(filename string) (func(string, string) []mr.KeyValue, func(string, []string) string) {
// 使用 plugin.Open 函数打开插件文件,返回插件对象实例 p 和可能发生的错误 err
// p 包含 Map 和 Reduce 函数
p, err := plugin.Open(filename)
if err != nil {
log.Fatalf("cannot load plugin: %v", filename)
}
// p.Lookup 方法查找插件中名为 "Map" 的导出符号,其实就是 Map 函数
// xmapf 是一个 plugin.Symbol 类型的变量,用于存储从插件中查找到的 Map 函数符号
// plugin.Symbol 是一个接口类型,代表插件中的任意导出符号
xmapf, err := p.Lookup("Map")
if err != nil {
log.Fatalf("cannot find Map function in plugin: %v", filename)
}
// 类型断言用于确定 xmapf 中存储的具体函数类型
// 并将其从 plugin.Symbol 接口类型断言回其静态的函数类型
// xmapf.() 就是类型断言, 将 plugin.Symbol 转化为具体函数类型
mapf := xmapf.(func(string, string) []mr.KeyValue)
// 同上,查找并断言 Reduce 函数
xreducef, err := p.Lookup("Reduce")
if err != nil {
log.Fatalf("cannot find Reduce function in plugin: %v", filename)
}
reducef := xreducef.(func(string, []string) string)
// 返回加载并断言成功的 Map 和 Reduce 函数
return mapf, reducef
}
mr/coordinator.go
struct Coordinator:分配 MapReduce 任务到对应 worker
Example():rpc 处理函数的例子
server():启动 rpc 服务,监听来自 worker 的请求
Done():检查 MapReduce 作业是否完成
MakeCoordinator():创建并初始化 Coordinator 实例
package mr
import (
"log"
"net"
"os"
"net/rpc"
"net/http"
)
// Coordinator 负责管理和分配任务
type Coordinator struct {
// Your definitions here.
}
// RPC handlers for the worker to call.
// an example RPC handler.
// the RPC argument and reply types are defined in rpc.go
// rpc 调用的参数和返回值,在 rpc.go 中定义
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
reply.Y = args.X + 1
return nil // rpc 调用成功
}
// start a thread that listens for RPCs from worker.go
func (c *Coordinator) server() {
// 注册协调者实例,处理 RPC 调用
rpc.Register(c)
// 允许使用 HTTP 协议进行 RPC 通信
rpc.HandleHTTP()
// 协调者 socket 文件名
sockname := coordinatorSock()
// 监听前移除已存在的 socket 文件,避免监听失败
os.Remove(sockname)
// 监听 UNIX socket,准备接收来自 worker 的连接
l, e := net.Listen("unix", sockname)
if e != nil {
log.Fatal("listen error:", e)
}
// 新的 goroutine 中启动 HTTP 服务,以处理 RPC 请求
go http.Serve(l, nil)
}
// main/mrcoordinator.go 会定期调用 Done() 函数来检查整个作业是否已完成。
func (c *Coordinator) Done() bool {
ret := false
// Your code here to implement the check for completion of all tasks
// 在这里实现检查所有任务是否完成的逻辑,例如检查所有 Map 和 Reduce 任务的状态
return ret // 作业是否完成
}
// create a Coordinator
// main/mrcoordinator.go calls this function
// nReduce is the number of reduce tasks to use.
// The returned value is a pointer to the newly created Coordinator instance.
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
// Your code here to initialize the Coordinator, e.g., load input files, setup tasks, etc
// 启动 RPC 服务器线程,以便监听和处理来自 worker 的 RPC 请求
c.server()
// 返回指向新创建的协调者实例的指针,这样调用者就可以通过这个指针来访问和操作协调者实例
return &c
}
mr/worker.go
KeyValue 结构体
ihash():返回 reduce 任务编号(用于发送 Map 输出的数据)
Worker():调用插件中的 map() 和 reduce() 函数
CallExample():展示 rpc 调用的完整流程,需要借助 call()
call():建立 rpc 连接,再发送 rpc 请求
// package mr - 定义了MapReduce作业的工作者包,包含实现MapReduce算法所需的结构和函数
// import语句 - 日志记录、rpc远程过程调用、哈希计算
package mr
import (
"fmt"
"log"
"net/rpc"
"hash/fnv"
)
// 定义 MapReduce 中的键值对
type KeyValue struct {
Key string
Value string
}
// 自定义的哈希函数,用于确定Map输出的键值对,应该发送到哪个Reduce任务
// Map阶段输出的键分配到不同的Reduce任务
func ihash(key string) int {
h := fnv.New32a() // 创建FNV-1a哈希生成器
// 字符串 key 转为 []byte 字节切片,因为 Wirte() 需要操作字节数据
h.Write([]byte(key)) // 将键的字节序列写入哈希生成器
// 使用按位与操作确保结果是一个非负整数,适合作为索引使用
// 0x7fffffff 就是 0111 1111 ... 1111,符号位为正,其他不变
return int(h.Sum32() & 0x7fffffff)
}
// Worker 函数 - 是MapReduce工作者的主要工作函数
// 它调用用户提供的map和reduce函数
// main/mrworker.go calls this function.
// 传入的两个参数是 mapf() 和 reducef()
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// 工作者实现细节将在这里编写,包括从协调者接收任务和发送结果
}
// example function to show how to make an RPC call to the coordinator.
func CallExample() {
// {X: 99} 结构体字面量, X 初始化为 99
args := ExampleArgs{X: 99} // rpc通信中传递的参数
reply := ExampleReply{} // 用于存储响应的返回值
// 发送RPC请求到协调者,等待回复
// 服务名称.方法名称,rpc包会根据这个字符串,找到对应的服务和方法进行调用
call("Coordinator.Example", &args, &reply)
fmt.Printf("reply.Y %v\n", reply.Y)
}
// send an RPC request to the coordinator, wait for the response.
func call(rpcname string, args interface{}, reply interface{}) bool {
sockname := coordinatorSock() // 获取协调者socket名称
c, err := rpc.DialHTTP("unix", sockname) // 建立RPC连接
if err != nil {
log.Fatal("dialing:", err)
}
defer c.Close()
// Call 方法是 net/rpc 包中的 *rpc.Client 类型的一个实例方法
err = c.Call(rpcname, args, reply) // 发送RPC请求
if err == nil {
return true
}
fmt.Println(err)
return false
}
mr/rpc.go
ExampleArgs 和 ExampleReply,表示 rpc 参数和 rpc 返回值两种类型
coordinatorSock():为协调者生成 socket 文件名
package mr
// RPC definitions
// remember to capitalize(大写) all names
import "os" // 操作系统功能,获取用户ID
import "strconv" // 字符串转换
// example to show how to declare the arguments(参数)
// and reply(返回值) for an RPC
type ExampleArgs struct {
X int
}
type ExampleReply struct {
Y int
}
// Add your RPC definitions here
// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the coordinator
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
// 这里指定的是一个UNIX域socket的文件路径前缀,它位于/var/tmp目录下
// 并且以"824-mr-"作为前缀,以确保socket文件名的唯一性
// 用于获取协调者的socket文件名,以便建立RPC连接
func coordinatorSock() string {
// 定义UNIX域socket的基础路径,前缀为"/var/tmp/824-mr-"
s := "/var/tmp/824-mr-"
// 将当前用户的UID转换为字符串并追加到基础路径之后,创建一个唯一的socket文件名
s += strconv.Itoa(os.Getuid())
return s // 协调者监听的socket文件的路径
}
🦈实验--开始
🐙阅读源码--小技巧
参考我用的 Kimi 或者 通义千问
将源码分 1~3 次发给 AI(1000+行的文件完全可以)
让 AI 分 3~5 次输出
然后进行下列提问
AI 就会给出精准的解答
1
2
3
🐋伪代码
mr/coordinator.go
package mr
import (
"log"
"net"
"os"
"sync"
"time"
"net/http"
"net/rpc"
)
// Coordinator 用于管理 MapReduce 作业的协调器
type Coordinator struct {
mu sync.Mutex // 互斥锁,用于保护协调器状态免受并发访问的影响
cond *sync.Cond // 条件变量,允许协调器等待直到map任务完成才分配reduce任务
mapFiles []string // map任务的输入文件列表
nMapTasks int // map任务的数量
nReduceTasks int // reduce任务的数量
mapTasksFinished []bool // 记录map任务是否完成
mapTasksIssued []time.Time // 记录map任务的分配时间
reduceTasksFinished []bool // 记录reduce任务是否完成
reduceTasksIssued []time.Time // 记录reduce任务的分配时间
isDone bool // 当所有reduce任务完成时,设置为true
}
// Example 用于测试RPC通信的示例函数
func (c *Coordinator) Example(args *ExampleArgs, reply *ExampleReply) error {
reply.Y = args.X + 1 // 示例RPC处理函数,返回args.X加1的结果
return nil
}
// server 启动协调器的RPC服务器,监听来自工作节点的请求
func (c *Coordinator) server() {
rpc.Register(c) // Coordinator 对象注册自己到 rpc
rpc.HandleHTTP() // 处理HTTP请求
sockname := coordinatorSock() // 调用coordinatorSock()生成套接字名称
os.Remove(sockname) // 移除旧的套接字文件
l, e := net.Listen("unix", sockname) // 监听UNIX域套接字
if e != nil {
log.Fatal("listen error:", e) // 监听错误,记录日志
}
go http.Serve(l, nil) // 启动HTTP服务,参数l是监听器,nil表示使用默认的多路复用器
}
// Done 检查所有reduce任务是否完成,以确定整个作业是否完成
func (c *Coordinator) Done() bool {
c.mu.Lock() // 加锁
defer c.mu.Unlock() // 延迟解锁
return c.isDone // 返回作业是否完成的状态
}
// FinishedTask 处理工作节点报告任务完成的RPC请求
func (c *Coordinator) FinishedTask(args *FinishedTaskArgs, reply *FinishedTaskReply) error {
c.mu.Lock() // 加锁
defer c.mu.Unlock() // 延迟解锁
switch args.TaskType { // 根据任务类型处理完成的任务
case Map:
c.mapTasksFinished[args.TaskNum] = true // 标记map任务完成
case Reduce:
c.reduceTasksFinished[args.TaskNum] = true // 标记reduce任务完成
default:
log.Fatalf("Bad finished task? %s", args.TaskType) // 未知任务类型,记录日志并退出
}
c.cond.Broadcast() // 唤醒GetTask处理器循环:任务已完成,因此我们可能可以分配另一个任务
return nil
}
// GetTask 处理工作节点请求新任务的RPC调用
func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error {
c.mu.Lock() // 加锁
defer c.mu.Unlock() // 延迟解锁
reply.NReduceTasks = c.nReduceTasks // 设置回复中的reduce任务数量
reply.NMapTasks = c.nMapTasks // 设置回复中的map任务数量
for {
// 直到没有剩余的map任务,一直分配map任务
mapDone := true
for m, done := range c.mapTasksFinished {
if !done { // 当前 map 任务未完成
// 未分配 或 超时(假设worker已崩溃),重新分配
if c.mapTasksIssued[m].IsZero() || time.Since(c.mapTasksIssued[m]).Seconds() > 10 {
reply.TaskType = Map // 任务类型
reply.TaskNum = m // 任务编号
reply.MapFile = c.mapFiles[m] // map文件
c.mapTasksIssued[m] = time.Now() // 更新分配时间
return nil // 结束当前函数
} else { // 当前 map 任务进行中
mapDone = false // 标记还有未完成的map任务
}
}
}
// 如果所有map任务都在进行中且没有超时,等待分配另一个任务
if !mapDone {
c.cond.Wait() // 等待条件变量
} else {
// 所有map任务都已完成!
break
}
}
// 所有map任务都已完成,现在分配reduce任务
for {
redDone := true
for r, done := range c.reduceTasksFinished {
if !done {
// 如果任务从未分配过,或者自从分配以来时间太长,可能worker已崩溃
if c.reduceTasksIssued[r].IsZero() || time.Since(c.reduceTasksIssued[r]).Seconds() > 10 {
reply.TaskType = Reduce // 设置任务类型为Reduce
reply.TaskNum = r // 设置任务编号
c.reduceTasksIssued[r] = time.Now() // 更新任务分配时间
return nil // 返回nil表示没有错误
} else {
redDone = false // 标记还有未完成的reduce任务
}
}
}
// 如果所有reduce任务都在进行中且没有超时,等待分配另一个任务
if !redDone {
c.cond.Wait() // 等待条件变量
} else {
// 所有reduce任务都已完成!
break
}
}
// 如果所有map和reduce任务都已完成,向查询的工作节点发送Done任务类型,并设置isDone为true
reply.TaskType = Done // 设置任务类型为Done
c.isDone = true // 标记所有任务完成
return nil // 返回nil表示没有错误
}
// MakeCoordinator 初始化并返回一个新的Coordinator实例
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{} // 作为值初始化Coordinator
c.cond = sync.NewCond(&c.mu) // 创建条件变量
c.mapFiles = files // 设置map任务的输入文件
c.nMapTasks = len(files) // 设置map任务的数量
c.mapTasksFinished = make([]bool, len(files)) // 初始化map任务完成状态
c.mapTasksIssued = make([]time.Time, len(files)) // 初始化map任务分配时间
c.nReduceTasks = nReduce // 设置reduce任务的数量
c.reduceTasksFinished = make([]bool, nReduce) // 初始化reduce任务完成状态
c.reduceTasksIssued = make([]time.Time, nReduce) // 初始化reduce任务分配时间
// 定期唤醒GetTask处理器线程,检查是否有任务未完成
go func() {
for {
c.mu.Lock() // 加锁
c.cond.Broadcast() // 广播条件变量
c.mu.Unlock() // 解锁
time.Sleep(time.Second) // 休眠一秒
}
}()
// 确保方法接收器是指针
c.server() // 在指针上调用自己的服务器方法
return &c // 返回指向Coordinator的指针
}
mr/worker.go
特别提醒,map 阶段,具体相同键的值,经过 ihash() 会被写入同一个临时文件,最后也就是写入同一个中间文件。
而 reduce 阶段会读取当前 reduce 任务所对应的每一个中间文件,然后排序。
package mr
import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"os"
"sort"
"net/rpc"
"hash/fnv"
)
// KeyValue 结构体用于存储键值对
type KeyValue struct {
Key string
Value string
}
// ByKey 用于按键排序的类型,实现sort.Interface接口
type ByKey []KeyValue
// Len 实现sort.Interface接口的方法
func (a ByKey) Len() int { return len(a) }
// Swap 实现sort.Interface接口的方法
func (a ByKey) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
// Less 实现sort.Interface接口的方法,用于比较两个元素
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }
// ihash 函数使用FNV哈希算法来选择每个KeyValue的reduce任务编号
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key)) // 将键写入哈希计算器
return int(h.Sum32() & 0x7fffffff) // 返回哈希值
}
// 根据map和reduce任务编号得到文件名
func getIntermediateFile(mapTaskN int, redTaskN int) string {
// 返回中间文件名
return fmt.Sprintf("mr-%d-%d", mapTaskN, redTaskN)
}
// 临时文件原子重命名为 最终输出文件
func finalizeReduceFile(tmpFile string, taskN int) {
finalFile := fmt.Sprintf("mr-out-%d", taskN) // 生成最终文件名
os.Rename(tmpFile, finalFile)
}
// 临时文件原子重命名为 中间文件
func finalizeIntermediateFile(tmpFile string, mapTaskN int, redTaskN int) {
finalFile := getIntermediateFile(mapTaskN, redTaskN) // 根据编号获取文件名
os.Rename(tmpFile, finalFile)
}
// performMap 实现map任务,读取文件内容,应用map函数,并将结果写入临时文件
func performMap(filename string, taskNum int, nReduceTasks int, mapf func(string, string) []KeyValue) {
// 读取要映射的内容
file, err := os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename) // 打开文件失败,记录日志并退出
}
content, err := ioutil.ReadAll(file) // 读取文件内容
if err != nil {
log.Fatalf("cannot read %v", filename) // 读取文件失败,记录日志并退出
}
file.Close() // 关闭文件
// 用map函数转化为 键值对切片
kva := mapf(filename, string(content)) // 返回 string, string 键值对切片
// 为每个文件创建临时文件和编码器
tmpFiles := []*os.File{} // 文件切片
tmpFilenames := []string{}
encoders := []*json.Encoder{} // 编码器切片
for r := 0; r < nReduceTasks; r++ {
tmpFile, err := ioutil.TempFile("", "") // 创建临时文件
if err != nil {
log.Fatalf("cannot open tmpfile") // 创建失败,记录日志并退出
}
tmpFiles = append(tmpFiles, tmpFile) // 临时文件添加到文件切片
tmpFilename := tmpFile.Name()
tmpFilenames = append(tmpFilenames, tmpFilename) // 临时文件名添加
enc := json.NewEncoder(tmpFile) // 用临时文件创建JSON编码器
encoders = append(encoders, enc) // 编码器添加
}
// 遍历每个键值对,将键值对通过 json 编码写入对应的临时文件
// 相同键的键值对,会被写入同一个临时文件,也就是写入同一个中间文件
for _, kv := range kva {
r := ihash(kv.Key) % nReduceTasks // 计算reduce任务编号
encoders[r].Encode(&kv) // 编码并写入对应的临时文件
}
for _, f := range tmpFiles {
f.Close() // 关闭所有临时文件
}
// 原子重命名临时文件为中间文件
// 每个 map 任务产生 nReduceTasks 个中间文件
for r := 0; r < nReduceTasks; r++ {
finalizeIntermediateFile(tmpFilenames[r], taskNum, r)
}
}
// performReduce 处理reduce任务,读取所有中间文件,按键排序,应用reduce函数,并将结果写入最终文件
func performReduce(taskNum int, nMapTasks int, reducef func(string, []string) string) {
// 收集所有与此reduce任务对应的中间文件,并收集相应的键值对
kva := []KeyValue{}
for m := 0; m < nMapTasks; m++ {
iFilename := getIntermediateFile(m, taskNum) // 获取中间文件名
file, err := os.Open(iFilename) // 打开中间文件
if err != nil {
log.Fatalf("cannot open %v", iFilename) // 打开文件失败,记录日志并退出
}
defer file.Close() // 确保文件最终被关闭
dec := json.NewDecoder(file) // 创建JSON解码器
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil { // 读取键值对
break // 读取完毕,退出循环
}
kva = append(kva, kv) // 将键值对添加到集合中
}
}
// 按键排序
sort.Sort(ByKey(kva)) // 使用sort包对键值对按键进行排序
// 获取临时reduce文件以写入值
tmpFile, err := ioutil.TempFile("", "") // 创建临时文件
if err != nil {
log.Fatalf("cannot open tmpfile") // 创建临时文件失败,记录日志并退出
}
defer tmpFile.Close() // 确保文件最终被关闭
tmpFilename := tmpFile.Name() // 获取临时文件名
// 对所有具有相同键的值应用reduce函数一次
keyBegin := 0
for keyBegin < len(kva) {
keyEnd := keyBegin + 1
// 此循环找到所有具有相同键的值 - 它们因为键已排序而聚集在一起
for keyEnd < len(kva) && kva[keyEnd].Key == kva[keyBegin].Key {
keyEnd++
}
values := []string{} // string 切片
for k := keyBegin; k < keyEnd; k++ {
values = append(values, kva[k].Value) // 收集相同键的所有值
}
// 每个值都是 "1", reducef() 直接返回相同键的个数
output := reducef(kva[keyBegin].Key, values) // 返回 "1" 的个数
// 将输出写入reduce任务临时文件,显示出来比如 apple 23
fmt.Fprintf(tmpFile, "%v %v\n", kva[keyBegin].Key, output) // 格式化写入键值对
// 转到下一个键
keyBegin = keyEnd
}
// 原子重命名reduce文件为最终reduce任务文件
finalizeReduceFile(tmpFilename, taskNum) // 重命名临时文件为最终输出文件
}
// Worker 函数由 main/mrworker.go 调用,是工作节点的主循环,负责与协调器通信,执行任务
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
// 每次循环都会发送 rpc 调用
for {
// 工作节点实现在这里
args := GetTaskArgs{}
reply := GetTaskReply{}
// 这将等待直到我们被分配一个任务!
// 建立 rpc 连接 + 发送请求
call("Coordinator.GetTask", &args, &reply) // 调用协调器的GetTask方法获取任务
switch reply.TaskType {
case Map:
performMap(reply.MapFile, reply.TaskNum, reply.NReduceTasks, mapf) // 执行map任务
case Reduce:
performReduce(reply.TaskNum, reply.NMapTasks, reducef) // 执行reduce任务
case Done:
// 没有剩余任务,让我们退出
os.Exit(0) // 退出程序,程序内所有 goroutine 都会终止
default:
fmt.Errorf("Bad task type? %s", reply.TaskType) // 错误的任务类型,记录错误
}
// 告诉协调器我们完成了
finArgs := FinishedTaskArgs{
TaskType: reply.TaskType,
TaskNum: reply.TaskNum,
}
finReply := FinishedTaskReply{}
// 通知 coordinator 任务已完成
// 调用的是 Coordinator 的 FinishedTask() 方法
call("Coordinator.FinishedTask", &finArgs, &finReply) // 调用协调器的FinishedTask方法报告任务完成
}
// 取消注释以向协调器发送Example RPC
// CallExample()
}
// CallExample 函数展示了如何向协调器发起RPC调用,用于测试RPC通信
func CallExample() {
// 声明一个参数结构体
args := ExampleArgs{}
// 填充参数
args.X = 99
// 声明一个回复结构体
reply := ExampleReply{}
// 发送RPC请求,等待回复
call("Coordinator.Example", &args, &reply) // 调用协调器的Example方法
// reply.Y 应该是 100
fmt.Printf("reply.Y %v\n", reply.Y) // 打印回复
}
// call 函数向协调器发送一个RPC请求并等待响应,处理与协调器的通信
func call(rpcname string, args interface{}, reply interface{}) bool {
sockname := coordinatorSock() // 来自本包的函数,生成协调器的套接字文件名
// c 是 *rpc.Client 类型,表示客户端连接
c, err := rpc.DialHTTP("unix", sockname) // 建立与协调器的RPC连接
if err != nil {
log.Fatal("dialing:", err) // 连接失败,记录日志并退出
}
defer c.Close() // 确保连接最终被关闭
err = c.Call(rpcname, args, reply) // 发送RPC请求
if err == nil {
return true // 请求成功
}
fmt.Println(err) // 打印错误信息
return false // 请求失败
}
mr/rpc.go
package mr
import (
"os"
"strconv"
)
// RPC definitions.
// 定义RPC接口和数据结构
// TaskType 定义任务类型,用于区分map任务、reduce任务和作业完成状态
type TaskType int
const (
Map TaskType = 1 // Map任务类型
Reduce TaskType = 2 // Reduce任务类型
Done TaskType = 3 // 作业完成状态
)
// GetTaskArgs 定义请求任务时的参数结构
type GetTaskArgs struct{}
// GetTaskReply 定义从协调器获取任务时返回的参数结构
type GetTaskReply struct {
// 任务类型,map或reduce
TaskType TaskType
// 任务编号
TaskNum int
// 用于Map任务,指定 reduce 任务数量
NReduceTasks int
// 用于Map任务,指定输入的文件名
MapFile string
// 用于Reduce任务,指定输入的文件数量
NMapTasks int
}
// FinishedTaskArgs 定义报告任务完成时的参数结构
type FinishedTaskArgs struct {
// 任务类型
TaskType TaskType
// 任务编号
TaskNum int
}
// FinishedTaskReply 定义报告任务完成后的返回结构,当前不需要返回任何数据
type FinishedTaskReply struct{}
// ExampleArgs 定义示例RPC调用的参数结构
type ExampleArgs struct {
X int
}
// ExampleReply 定义示例RPC调用的返回结构
type ExampleReply struct {
Y int
}
// coordinatorSock 生成协调器的UNIX域套接字文件名,确保每个用户和每次运行都有唯一的文件名
func coordinatorSock() string {
s := "/var/tmp/824-mr-" // 套接字文件名前缀
s += strconv.Itoa(os.Getuid()) // 将当前用户的UID附加到套接字文件名,确保唯一性
return s // 返回完整的套接字文件名
}