Hive On Tez小文件合并的技术调研

Hive On Tez小文件合并的技术调研

背景

在升级到CDP7.1.5之后,默认的运算引擎变成了Tez,之前这篇有讲过:

https://lizhiyong.blog.csdn.net/article/details/126688391

具体参考Cloudera的官方文档:https://docs.cloudera.com/cdp-private-cloud-base/7.1.3/hive-introduction/topics/hive-unsupported.html

并且只能用Tez,调度、血缘等重度依赖租来的阿里云DataPhin,那么最常用的离线跑批任务还是要使用HQL【也就是Hive On Tez】。HQL上手门槛极低,之前搞那种Oracle数据库开发的也可以一周内升任Sql Boy岗位,这是其一;PySpark任务或者用Java/Scala打Jar包的Spark任务由于平台的缺陷无法记录血缘,做溯源及下游影响分析时也是极不方便,只能怼人天靠人工,效率低下。HQL虽然低端并且性能不高,但是一时半会儿还不能被取缔。

原先在CDH5.16中,HQL任务是Hive On MapReduce。写在HQL最上方用于合并小文件的参数到了CDP就不能使用了:

set hive.merge.mapfiles=true;	--这种对只有map的任务有效
set hive.merge.mapredfiles=true;	--这种对有reduce的任务有效
set hive.merge.size.per.task=256000000;	--合并时每个task默认的处理的文件体积256M
set hive.merge.smallfiles.avgsize=16000000;	--写文件时平均体积小于默认的16M就合并

所以急需合并小文件,最好还是这种方式,就可以不用做太大的改动。

方式

直接set启用

既然之前的MapReduce任务有小文件合并的功能,那么找一找,还真就找到了更多的参数,在HiveConf这个Java类里。

在Apache Hive3.1.2的Java源码找到:

package org.apache.hadoop.hive.conf;

public class HiveConf extends Configuration {
HIVEMERGEMAPFILES("hive.merge.mapfiles", true,
        "Merge small files at the end of a map-only job"),
    HIVEMERGEMAPREDFILES("hive.merge.mapredfiles", false,
        "Merge small files at the end of a map-reduce job"),
    HIVEMERGETEZFILES("hive.merge.tezfiles", false, "Merge small files at the end of a Tez DAG"),
    HIVEMERGESPARKFILES("hive.merge.sparkfiles", false, "Merge small files at the end of a Spark DAG Transformation"),
    HIVEMERGEMAPFILESSIZE("hive.merge.size.per.task", (long) (256 * 1000 * 1000),
        "Size of merged files at the end of the job"),
    HIVEMERGEMAPFILESAVGSIZE("hive.merge.smallfiles.avgsize", (long) (16 * 1000 * 1000),
        "When the average output file size of a job is less than this number, Hive will start an additional \n" +
        "map-reduce job to merge the output files into bigger files. This is only done for map-only jobs \n" +
        "if hive.merge.mapfiles is true, and for map-reduce jobs if hive.merge.mapredfiles is true."),
    HIVEMERGERCFILEBLOCKLEVEL("hive.merge.rcfile.block.level", true, ""),
    HIVEMERGEORCFILESTRIPELEVEL("hive.merge.orcfile.stripe.level", true,
        "When hive.merge.mapfiles, hive.merge.mapredfiles or hive.merge.tezfiles is enabled\n" +
        "while writing a table with ORC file format, enabling this config will do stripe-level\n" +
        "fast merge for small ORC files. Note that enabling this config will not honor the\n" +
        "padding tolerance config (hive.exec.orc.block.padding.tolerance)."),
    MERGE_CARDINALITY_VIOLATION_CHECK("hive.merge.cardinality.check", true,
      "Set to true to ensure that each SQL Merge statement ensures that for each row in the target\n" +
        "table there is at most 1 matching row in the source table per SQL Specification.")

}

参考:https://lizhiyong.blog.csdn.net/article/details/126688391

这篇有扒源码,看到了Hive最终会把AST解析成Tez或者Spark的DAG,那么此处的hive.merge.tezfileshive.merge.sparkfiles按照注释的字面意思,也就很容易看明白,是要在DAG的结尾处来一次merge。这当然就是最简单的方式。由于某些配置默认是false也就是停用状态,所以必须手动set启用才能生效。

所以只需要在HQL任务头上+这些参数即可:

set hive.merge.mapfiles=true;	--这种对只有map的任务有效
set hive.merge.mapredfiles=true;	--这种对有reduce的任务有效
set hive.merge.tezfiles=true;
set hive.merge.size.per.task=256000000;	--合并时每个task默认的处理的文件体积256M
set hive.merge.smallfiles.avgsize=16000000;	--写文件时平均体积小于默认的16M就合并

使用Apache Hive自己搭建了Hive On Spark的难兄难弟们就可以照猫画虎:

set hive.merge.mapfiles=true;	--这种对只有map的任务有效
set hive.merge.mapredfiles=true;	--这种对有reduce的任务有效
set hive.merge.sparkfiles=true;
set hive.merge.size.per.task=256000000;	--合并时每个task默认的处理的文件体积256M
set hive.merge.smallfiles.avgsize=16000000;	--写文件时平均体积小于默认的16M就合并

不过Hive的Calcite和CBO、RBO比起商业化DataBrick的Catalyst优势不明显,这种方式性能并不理想,使用SparkSQL的应该才是大多数。

验证

已经在prod环境充分验证,加入参数后敲:

hadoop fs -count
hadoop fs -du -s -h
hadoop fs -du -h

可以看到小文件情况明显改善。

到这里,肤浅的SQL Boy们就可以止步,已经够用了。

调整reducer个数

有时候,出于种种原因【数据量基本可以预知】,我们希望可以像Spark那样写死文件的个数。这样后续的任务调优、资源配额就比较方便。

Spark默认200个Task:

spark.sql.shuffle.partitions=200

只需要最后写文件或者sql跑insert overwrite前来一句免Shuffle高性能的:

df.coalesce(1)

或者肤浅的Sql Boy们比较能接受的Shuffle低性能的:

df1.repartition(1)

即可。Tez当然也是有办法实现这种写死文件个数的效果,那就是限制reducer个数,利用利用distribute by打散到reducer,每个reducer会写一个文件,最终文件个数就是写死的reducer个数。

验证

create external table if not exists test_small_file1(
	id int,
    message string
)
stored as parquet
location '这里写自己集群的即可'
;

create external table if not exists test_small_file2(
	id int,
    message string
)
partitioned by(
	dt string
)
stored as parquet
location '这里写自己集群的即可'
;

作为结果表。

create external table if not exists test_data_source(
	id int,
    message string,
    dt
)
stored as parquet
location '这里写自己集群的即可'
;

create external table if not exists test_data_source_100w(
	id int,
    message string,
    dt
)
stored as parquet
location '这里写自己集群的即可'
;

create external table if not exists test_data_source_1000w(
	id int,
    message string,
    dt
)
stored as parquet
location '这里写自己集群的即可'
;

作为取数的数据源表。

然后insert数据产生小文件:

insert into test_data_source values(1,'a1',20230310);
...	--这里自己填充
insert into test_data_source values(20,'a20',20230310);

insert into test_data_source values(21,'a21',20230311);
...
insert into test_data_source values(40,'a40',20230311);

insert into test_data_source values(100,'a100',20230312);
...
insert into test_data_source values(199,'a199',20230312);

insert into test_data_source values(200,'a200',20230313);
...
insert into test_data_source values(299,'a299',20230313);

此时在Impala执行:

show files in db_name.test_data_source;

或者直接HDFS查看:

hadoop fs -du -h

都可以看到240个小文件。

直接灌数据到结果表:

insert overwrite table test_small_file1
select id,message from test_data_source;

可以看到结果是2个小文件。

灌入分区表:

insert overwrite table test_small_file2 partition(dt)
select id,message,dt from test_data_source;

可以看到是6个小文件。

这点数据就会产生小文件。

Mock出百万级别的数据量:

insert overwrite table test_data_source_100w
select * from test_date_source;
;
insert overwrite table test_data_source_100w
select * from test_data_source_100w;
;

多次执行,直到数据量超过100w。

直接灌数据到结果表:

insert overwrite table test_small_file1
select id,message from test_data_source_100w;

可以看到结果还是2个小文件。

灌入分区表:

insert overwrite table test_small_file2 partition(dt)
select id,message,dt from test_data_source_100w;

可以看到是8个小文件。

重头戏来了:

set hive.exec.reducers.bytes.per.reduce=5120000000;
set mapreduce.job.reduces=10;
insert overwrite table test_small_file1
select id,message from test_data_source_100w
distribute by cast(rand() * 100 as int)
;

打散为10个小文件。

set hive.exec.reducers.bytes.per.reduce=5120000000;
set mapreduce.job.reduces=10;
insert overwrite table test_small_file2 partition(dt)
select id,message,dt from test_data_source_100w
distribute by cast(rand() * 100 as int)
;

由于4个日期,所以打散成40个小文件。

之后Mock出千万级别的也类似。直接说结果:

最终test_small_file1表有5个小文件,test_small_file2有20个小文件。

但是使用参数后:

最终test_small_file1表有10个小文件,test_small_file2有40个小文件。

所以可以看出,distribute by cast(rand() * 100 as int)这个操作利用了Hash Partitioner,结果散步到了Reduce Task。最后文件的个数=Reduce Task个数。

分区表是每个Partition的结果再次Hash Partitioner打散,所以每个分区路径的parquet文件个数都是=Reduce Task个数。

由于hive.exec.reducers.bytes.per.reduce可以设置的很大,那么只需要修改mapreduce.job.reduces的值,就可以让Tez跑HQL任务时写固定个数的文件。

ACID表的ORC小文件

由于Sql Boy们比较顽固,总是想着把Hive当RDBMS来用,于是。。。说多了都是泪。。。

Hive的ACID表会产生大量的小文件,阈值配置的不合适时,触发合并的次数就很少,导致小文件越来越多。这有点像Hudi的MOR【merge on read】。当下游HQL任务从这些ACID表读数据时,就会由于Map Task过多,出现极其严重的性能问题,这个故事慢慢讲。于是Sql Boy们养成了手动合并小文件的好习惯:

alter table tb_name compact 'major';
alter table tb_name compact 'minor';

可以通过:

show compactions;

查看提交到Yarn的合并任务的记录,自己把unix的timestamp换算成人类能看懂的时间,就知道近期的合并情况。。。任务跑得慢,想起来了就手动合并一下。。。

总结

Hive On Tez主要就是这3种合并小文件的方式。

在这里插入图片描述
转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/129511318

评论 2
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值