MapReduce优化----参数的解释以及设置

Map阶段优化


参数:io.sort.mb(default 100)

当map task开始运算,并产生中间数据时,其产生的中间结果并非直接就简单的写入磁盘。
而是会利用到了内存buffer来进行已经产生的部分结果的缓存,
并在内存buffer中进行一些预排序来优化整个map的性能。
每一个map都会对应存在一个内存buffer,map会将已经产生的部分结果先写入到该buffer中,
这个buffer默认是100MB大小,
但是这个大小是可以根据job提交时的参数设定来调整的,
当map的产生数据非常大时,并且把io.sort.mb调大,
那么map在整个计算过程中spill的次数就势必会降低,
map task对磁盘的操作就会变少,
如果map tasks的瓶颈在磁盘上,这样调整就会大大提高map的计算性能。
 
参数:io.sort.spill.percent(default 0.80,也就是80%)
map在运行过程中,不停的向该buffer中写入已有的计算结果,
但是该buffer并不一定能将全部的map输出缓存下来,
当map输出超出一定阈值(比如100M),那么map就必须将该buffer中的数据写入到磁盘中去,
这个过程在mapreduce中叫做spill。
map并不是要等到将该buffer全部写满时才进行spill,
因为如果全部写满了再去写spill,势必会造成map的计算部分等待buffer释放空间的情况。
所以,map其实是当buffer被写满到一定程度(比如80%)时,就开始进行spill。
这个阈值也是由一个job的配置参数来控制,
这个参数同样也是影响spill频繁程度,进而影响map task运行周期对磁盘的读写频率的。
但非特殊情况下,通常不需要人为的调整。调整io.sort.mb对用户来说更加方便。
 
参数:io.sort.factor
当map task的计算部分全部完成后,如果map有输出,就会生成一个或者多个spill文件,这些文件就是map的输出结果。
map在正常退出之前,需要将这些spill合并(merge)成一个,所以map在结束之前还有一个merge的过程。
merge的过程中,有一个参数可以调整这个过程的行为,该参数为:io.sort.factor。
该参数默认为10。它表示当merge spill文件时,最多能有多少并行的stream向merge文件中写入。
比如如果map产生的数据非常的大,产生的spill文件大于10,而io.sort.factor使用的是默认的10,
那么当map计算完成做merge时,就没有办法一次将所有的spill文件merge成一个,而是会分多次,每次最多10个stream。
这也就是说,当map的中间结果非常大,调大io.sort.factor,
有利于减少merge次数,进而减少map对磁盘的读写频率,有可能达到优化作业的目的。
 
参数:min.num.spill.for.combine(default 3)
当job指定了combiner的时候,我们都知道map介绍后会在map端根据combiner定义的函数将map结果进行合并。
运行combiner函数的时机有可能会是merge完成之前,或者之后,这个时机可以由一个参数控制,
即min.num.spill.for.combine(default 3),当job中设定了combiner,并且spill数最少有3个的时候,
那么combiner函数就会在merge产生结果文件之前运行。
通过这样的方式,就可以在spill非常多需要merge,并且很多数据需要做conbine的时候,
减少写入到磁盘文件的数据数量,同样是为了减少对磁盘的读写频率,有可能达到优化作业的目的。
 
参数:mapred.compress.map.output(default false)
减少中间结果读写进出磁盘的方法不止这些,还有就是压缩。
也就是说map的中间,无论是spill的时候,还是最后merge产生的结果文件,都是可以压缩的。
压缩的好处在于,通过压缩减少写入读出磁盘的数据量。
对中间结果非常大,磁盘速度成为map执行瓶颈的job,尤其有用。
控制map中间结果是否使用压缩的参数为:mapred.compress.map.output(true/false)。
将这个参数设置为true时,那么map在写中间结果时,就会将数据压缩后再写入磁盘,读结果时也会采用先解压后读取数据。
这样做的后果就是:写入磁盘的中间结果数据量会变少,但是cpu会消耗一些用来压缩和解压。
所以这种方式通常适合job中间结果非常大,瓶颈不在cpu,而是在磁盘的读写的情况。
说的直白一些就是用cpu换IO。
根据观察,通常大部分的作业cpu都不是瓶颈,除非运算逻辑异常复杂。所以对中间结果采用压缩通常来说是有收益的。
 
参数:mapred.map.output.compression.codec( default org.apache.hadoop.io.compress.DefaultCodec)
当采用map中间结果压缩的情况下,用户还可以选择压缩时采用哪种压缩格式进行压缩,
现在hadoop支持的压缩格式有:GzipCodec,LzoCodec,BZip2Codec,LzmaCodec等压缩格式。
通常来说,想要达到比较平衡的cpu和磁盘压缩比,LzoCodec比较适合。但也要取决于job的具体情况。
用户若想要自行选择中间结果的压缩算法,

可以设置配置参数:mapred.map.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec或者其他用户自行选择的压缩方式 


Reduce阶段优化


reduce的运行是
分成三个阶段的。分别为copy->sort->reduce。
由于job的每一个map都会根据reduce(n)数将数据分成map 输出结果分成n个partition,
所以map的中间结果中是有可能包含每一个reduce需要处理的部分数据的。
所以,为了优化reduce的执行时间,hadoop中是等job的第一个map结束后,
所有的reduce就开始尝试从完成的map中下载该reduce对应的partition部分数据。

这个过程就是通常所说的shuffle,也就是copy过程。


参数:mapred.reduce.parallel.copies(default 5)
说明:每个reduce并行下载map结果的最大线程数
Reduce task在做shuffle时,实际上就是从不同的已经完成的map上去下载属于自己这个reduce的部分数据,
由于map通常有许多个,所以对一个reduce来说,下载也可以是并行的从多个map下载,这个并行度是可以调整的,
调整参数为:mapred.reduce.parallel.copies(default 5)。
默认情况下,每个只会有5个并行的下载线程在从map下数据,如果一个时间段内job完成的map有100个或者更多,
那么reduce也最多只能同时下载5个map的数据,

所以这个参数比较适合map很多并且完成的比较快的job的情况下调大,有利于reduce更快的获取属于自己部分的数据。


参数:mapred.reduce.copy.backoff(default 300秒)
说明:reduce下载线程最大等待时间(秒)
reduce的每一个下载线程在下载某个map数据的时候,有可能因为那个map中间结果所在机器发生错误,
或者中间结果的文件丢失,或者网络瞬断等等情况,这样reduce的下载就有可能失败,
所以reduce的下载线程并不会无休止的等待下去,当一定时间后下载仍然失败,那么下载线程就会放弃这次下载,
并在随后尝试从另外的地方下载(因为这段时间map可能重跑)。
所以reduce下载线程的这个最大的下载时间段是可以调整的,
调整参数为:mapred.reduce.copy.backoff(default 300秒)。
如果集群环境的网络本身是瓶颈,那么用户可以通过调大这个参数来避免reduce下载线程被误判为失败的情况。

不过在网络环境比较好的情况下,没有必要调整。通常来说专业的集群网络不应该有太大问题,所以这个参数需要调整的情况不多。


参数:io.sort.factor
Reduce将map结果下载到本地时,同样也是需要进行merge的,所以io.sort.factor的配置选项同样会影响reduce进行merge时的行为,

该参数的详细介绍上文已经提到,当发现reduce在shuffle阶段iowait非常的高的时候,就有可能通过调大这个参数来加大一次merge时的并发吞吐,优化reduce效率。


参数:mapred.job.shuffle.input.buffer.percent(default 0.7)
说明:用来缓存shuffle数据的reduce task heap百分比
Reduce在shuffle阶段对下载来的map数据,并不是立刻就写入磁盘的,而是会先缓存在内存中,然后当使用内存达到一定量的时候才刷入磁盘。
这个内存大小的控制就不像map一样可以通过io.sort.mb来设定了,而是通过另外一个参数来设置:mapred.job.shuffle.input.buffer.percent(default 0.7),
这个参数其实是一个百分比,意思是说,shuffile在reduce内存中的数据最多使用内存量为:0.7 × maxHeap of reduce task。
也就是说,如果该reduce task的最大heap使用量(通常通过mapred.child.java.opts来设置,比如设置为-Xmx1024m)的一定比例用来缓存数据。
默认情况下,reduce会使用其heapsize的70%来在内存中缓存数据。

如果reduce的heap由于业务原因调整的比较大,相应的缓存大小也会变大,这也是为什么reduce用来做缓存的参数是一个百分比,而不是一个固定的值了。


参数:mapred.job.shuffle.merge.percent(default 0.66)
说明:缓存的内存中多少百分比后开始做merge操作
假设mapred.job.shuffle.input.buffer.percent为0.7,reduce task的max heapsize为1G,
那么用来做下载数据缓存的内存就为大概700MB左右,这700M的内存,跟map端一样,
也不是要等到全部写满才会往磁盘刷的,而是当这700M中被使用到了一定的限度(通常是一个百分比),就会开始往磁盘刷。
这个限度阈值也是可以通过job参数来设定的,设定参数为:mapred.job.shuffle.merge.percent(default 0.66)。

如果下载速度很快,很容易就把内存缓存撑大,那么调整一下这个参数有可能会对reduce的性能有所帮助。


参数:mapred.job.reduce.input.buffer.percent(default 0.0)
说明:sort完成后reduce计算阶段用来缓解数据的百分比
当reduce将所有的map上对应自己partition的数据下载完成后,就会开始真正的reduce计算阶段
(中间有个sort阶段通常时间非常短,几秒钟就完成了,因为整个下载阶段就已经是边下载边sort,然后边merge的)。
当reduce task真正进入reduce函数的计算阶段的时候,有一个参数也是可以调整reduce的计算行为。
也就是:mapred.job.reduce.input.buffer.percent(default 0.0)。
由于reduce计算时肯定也是需要消耗内存的,而在读取reduce需要的数据时,同样是需要内存作为buffer,
这个参数是控制,需要多少的内存百分比来作为reduce读已经sort好的数据的buffer百分比。
默认情况下为0,也就是说,默认情况下,reduce是全部从磁盘开始读处理数据。
如果这个参数大于0,那么就会有一定量的数据被缓存在内存并输送给reduce,
当reduce计算逻辑消耗内存很小时,可以分一部分内存用来缓存数据,反正reduce的内存闲着也是闲着。
<think>好的,我现在需要帮助用户了解如何在MapReduce中实现倒排索引。首先,我得回忆一下倒排索引的基本概念。倒排索引是搜索引擎中的核心数据结构,它记录每个单词出现在哪些文档中以及出现的次数。比如,用户输入一个关键词,搜索引擎能快速找到包含该词的所有文档。 接下来,用户想知道用MapReduce怎么实现。MapReduce分为Map和Reduce两个阶段,我需要理清这两个阶段分别处理什么数据。根据引用[3],倒排索引可以看作是WordCount的扩展,不过需要统计单词在多个文件中的出现次数。所以,Mapper的任务应该是读取每个文件,然后对每个单词生成键值对,键是单词,值是文件名和1,表示在该文件出现了一次。比如,如果单词"apple"在文件doc1.txt中出现,Mapper会输出<"apple", "doc1.txt 1">。 然后是Shuffle和Sort阶段,系统会自动将相同单词的键值对聚合在一起,发送给同一个Reducer。这时候,Reducer需要处理的是同一个单词对应的多个文件名和计数。Reducer需要将这些值合并,统计每个文件中该单词出现的总次数,并汇总成“文件名:次数”的形式。比如,Reducer收到<"apple", ["doc1.txt 1", "doc1.txt 1", "doc2.txt 1"]>,处理后应该输出<"apple", "doc1.txt:2, doc2.txt:1">。 不过,用户可能需要具体的步骤和代码示例。根据引用[4],设置Job的主类、Mapper和Reducer类,以及输入输出类型是关键。在Mapper中,需要获取到文件名,这可以通过InputSplit来实现,或者使用Hadoop的FileInputFormat相关方法。比如,在setup方法中获取文件的路径,然后在map方法中将文件名和单词组合成值。 代码方面,Mapper类需要继承自Mapper,重写map方法。比如,读取每一行的单词,然后输出(word, filename + "\t1")。Reducer类则接收这些键值对,遍历所有的值,统计每个文件中的出现次数,最后合并成需要的格式。 可能还需要考虑Combiner的使用,但倒排索引的情况可能不适合,因为Combiner需要在本地合并,而Combiner和Reducer的处理逻辑可能不同,需要谨慎处理。比如,如果Combiner只是局部合并,可能会影响最终的全局统计。 此外,用户可能需要示例代码来理解具体实现。根据引用[3]和引用[4],可以编写一个简单的Mapper和Reducer,然后在Driver类中设置Job的参数,比如输入输出路径、Mapper和Reducer类等。代码中需要注意如何获取文件名,这可能需要使用上下文对象来获取输入文件的信息。 最后,需要验证这个方法的正确性,比如在多个文件中运行,检查输出是否符合预期,每个单词是否正确地关联到所有出现的文件及其次数。同时,要考虑性能优化,比如合理设置Map和Reduce的任务数量,或者处理大文件时的分块策略。 总结一下,实现步骤大致分为:1. Mapper处理每个文件,生成单词和文件名的计数;2. Shuffle阶段聚合相同单词;3. Reducer统计每个文件的出现次数并格式化输出。代码示例需要展示如何获取文件名、处理键值对以及合并结果。</think>### MapReduce实现倒排索引的方法 #### 倒排索引概念 倒排索引(Inverted Index)是一种将**单词映射到其所在文档位置**的数据结构,常用于搜索引擎。例如,单词$w$出现在文档$d_1$中3次、$d_2$中5次,则索引为:$w \rightarrow \{d_1:3, d_2:5\}$[^1]。 #### 实现步骤 1. **Mapper阶段** 输入:文档集合(每行文本对应一个文件) 输出:键值对 $(word, fileName:1)$ 逻辑: - 读取输入文件,获取文件名(通过`FileSplit`获取输入路径) - 对文本分词后,为每个单词生成中间键值对 2. **Shuffle阶段** MapReduce框架自动将相同$word$的键值对合并,形成 $(word, [fileName:1, fileName:1, ...])$ 3. **Reducer阶段** 输入:$(word, [fileName:1, fileName:1, ...])$ 输出:$(word, fileName:count)$ 逻辑: - 统计每个$fileName$的出现次数 - 合并结果为`文件名:次数`的列表 #### 示例代码 ```java // Mapper类 public static class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text> { private Text word = new Text(); private Text fileInfo = new Text(); @Override protected void setup(Context context) throws IOException { // 获取输入文件名 String fileName = ((FileSplit) context.getInputSplit()).getPath().getName(); fileInfo.set(fileName + ":1"); } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split(" "); for (String w : words) { word.set(w); context.write(word, fileInfo); // 输出格式:(word, fileName:1) } } } // Reducer类 public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> { private Text result = new Text(); @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Map<String, Integer> fileCounts = new HashMap<>(); for (Text val : values) { String[] parts = val.toString().split(":"); String fileName = parts[0]; int count = Integer.parseInt(parts[1]); fileCounts.put(fileName, fileCounts.getOrDefault(fileName, 0) + count); } StringBuilder sb = new StringBuilder(); for (Map.Entry<String, Integer> entry : fileCounts.entrySet()) { sb.append(entry.getKey()).append(":").append(entry.getValue()).append(", "); } result.set(sb.substring(0, sb.length() - 2)); // 去除末尾逗号 context.write(key, result); // 输出格式:(word, file1:3, file2:5) } } ``` #### 优化点 1. **Combiner局部聚合**:在Mapper后添加Combiner,合并同一节点上的相同单词统计[^2] 2. **文件分块策略**:根据数据量调整`InputSplit`大小,提升并行度 3. **压缩中间结果**:启用Map输出压缩,减少Shuffle数据传输量
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值