Yarn 和 MR 资源配置
配置项参考官网:https://apache.github.io/hadoop/
Yarn 资源配置
修改 yarn-site.xml,调整的 Yarn 参数均与 CPU、内存等资源有关,配置如下:
|
|
修改后重新分发该配置文件并重启 Yarn
MR 资源配置
MapReduce 资源配置主要包括 Map Task 的内存和 CPU 核数,以及 Reduce Task 的内存和 CPU 核数。核心配置参数如下:
mapreduce.map.memory.mb
该参数的含义是,单个 Map Task 申请的 container 容器内存大小,其默认值为 1024。该值不能超出 yarn.scheduler.maximum-allocation-mb 和 yarn.scheduler.minimum-allocation-mb 规定的范围。
该参数需要根据不同的计算任务单独进行配置,在 hive 中,可直接使用如下方式为每个 SQL 语句单独进行配置:set mapreduce.map.memory.mb=2048;
mapreduce.map.cpu.vcores
该参数的含义是,单个 Map Task 申请的 container 容器 cpu 核数,其默认值为 1。该值一般无需调整。如需调整要修改 mapred-site.xml 文件(mapred-default.xml)
mapreduce.reduce.cpu.vcores
该参数的含义是,单个 Reduce Task 申请的 container 容器 cpu 核数,其默认值为 1。该值一般无需调整。如需调整要修改 mapred-site.xml 文件(mapred-default.xml)
mapreduce.reduce.memory.mb
该参数的含义是,单个 Reduce Task 申请的 container 容器内存大小,其默认值为 1024。该值同样不能超出 yarn.scheduler.maximum-allocation-mb 和 yarn.scheduler.minimum-allocation-mb 规定的范围。
该参数需要根据不同的计算任务单独进行配置,在 hive 中,可直接使用如下方式为每个 SQL 语句单独进行配置:set mapreduce.reduce.memory.mb=2048;
Explain 查看执行计划
Explain 用于呈现 HQL 语句的详细执行步骤,由一系列 Stage 组成,简单的理解为 HQL 查询语句的不同执行阶段,这一系列 Stage 具有依赖关系,每个 Stage 对应一个 MapReduce Job 或一个文件系统操作等。
若某个 Stage 对应的一个 MapReduce Job,则其 Map 端和 Reduce 端的计算逻辑分别由 Map Operator Tree 和 Reduce Operator Tree 进行描述,Operator Tree 由一系列的 Operator 组成,一个 Operator 代表在 Map 或 Reduce 阶段的一个单一的逻辑操作,例如 TableScan Operator,Select Operator,Join Operator 等。具体如下图:

常见的 Operator 及其作用如下
TableScan:表扫描操作,通常 map 端第一个操作肯定是表扫描操作
Select Operator:选取操作
Group By Operator:map 端的分组聚合操作,在后面的分组聚合中会讲到
Reduce Output Operator:输出到 reduce 操作
Filter Operator:过滤操作
Join Operator:join 操作
File Output Operator:文件输出操作
Fetch Operator 客户端获取数据操作
Explain 语法
EXPLAIN [FORMATTED | EXTENDED | DEPENDENCY]
- FORMATTED:将执行计划以 JSON 字符串的形式输出
- EXTENDED:输出执行计划中的额外信息,通常是读写的文件名等信息
- DEPENDENCY:输出执行计划读取的表及分区
例:
hive (default)>
explain formatted
select user_id,count(*) from order_detail group by user_id;
分组聚合优化
分组聚合是通过 MR Job 实现的,map 端读取数据,并按照分组字段分区,通过 shuffle,把数据发到 reduce,各组数据在 reduce 端完成最终的聚合运算。
分组聚合的优化主要围绕减少 shuffle 数据量进行,具体做法是 map-side 聚合。map-side 聚合是在 map 端维护一个 hash table,先利用其完成数据的部分聚合,再把聚合的结果按照分组字段分区,发到 reduce 端完成最终聚合,以此提高分组聚合运算效率。简而言之就是增加了一个 map 端的部分聚合过程,以减少 shuffle 的工作量,进而减少 reduce 端的聚合工作量。
map-side 聚合相关参数如下
–启用 map-side 聚合,默认是 true
set hive.map.aggr=true;
–用于检测源表数据是否适合进行 map-side 聚合。检测的方法是:系统自动先对若干条数据进行 map-side 聚合,若聚合后的条数和聚合前的条数比值小于该值,则认为该表适合进行 map-side 聚合;否则,认为该表数据不适合进行 map-side 聚合,后续数据便不再进行 map-side 聚合。0.5 意味着平均有 2 条数据可以聚合成 1 条,1 意味着没有出现任何的聚合
set hive.map.aggr.hash.min.reduction=0.5;
–用于hive.map.aggr.hash.min.reduction=0.5 检测源表是否适合 map-side 聚合的条数。
set hive.groupby.mapaggr.checkinterval=100000;
–map-side 聚合所用的 hash table 占用 map task 堆内存的最大比例,若超出该值,则会对 hash table 进行一次 flush。
set hive.map.aggr.hash.force.flush.memory.threshold=0.7;
优化前 VS 优化后
set hive.map.aggr=false 关闭分组聚合优化,查看执行效果,在 Map 端没有了 Group By Operator

set hive.map.aggr=true 开启分组聚合优化,查看执行效果,在 Map 端有了 Group By Operator,

若发生 map-side 优化,优化后比优化前的 HQL 执行耗时应该有所减少,且 map 的 output 数量明显小于 input 数量。
若没有触发 map-side,则 map 的 output 数量虽然比 input 数量有所减少但可以忽略不计。具体有没有触发 map-side 可以去 web UI 界面查看 map 日志。
注意!!map-side 聚合不够智能,即 map 端的分组聚合是否执行一定程度上会受到分组字段在表中存储的位置和分布的影响,这是底层存储问题,未必是因为数据真的不适合分组聚合。要解决此问题可以提前对数据分区分桶,使用分区分桶表,使得同一区域存储的数据分布具有一定的相似性,这样聚合结果会有所提升。
例:
1)select province_id,count(*) from order_detail group by province_id;
该语句查询所有订单,根据省份 id 分组聚合,省份只有 34 个,这样 map 后的数据应该只有 34 条,所以聚合结果是应该是比较可观的。所以 group by 的基数越小,一般越适合聚合。
2)select product_id,count(*) from order_detail group by product_id;
若 product_id 这一分组字段在 order_detail 表中分布比较散,那么可能会导致 hive 在表中切片抽样进行 map-side 检测的时候测试聚合结果>0.5,那么最终就没有使用 map-side 聚合。所以说如果能保证抽样数据的测试结果<=0.5,就会实现分组聚合,当然也可以调整hive.map.aggr.hash.min.reduction 的值以提高 map-side 的命中率。
若 100w 的数据集分组聚合之后的输出>100w,可能的原因是多次触发了 hash table 的 flush
Join 优化
Join 优化就是控制 HQL 语句走哪种 join 算法,这些 join 算法有的快,有的慢,有的激进,有的保守。我们要做的就是让 HQL 走最适合自己的 join 算法。
Common Join(普通 join)
原理
hive 中最稳定的 join 算法,其通过一个 MapReduce Job 完成一个 join 操作。Map 端负责读取 join 操作所需表的数据,并按照关联字段进行分区,通过 Shuffle,将其发送到 Reduce 端,相同 key 的数据在 Reduce 端完成最终的 Join 操作。

需要注意的是,HQL 语句中的 join 操作和执行计划中的 Common Join 任务并非一对一的关系,即 HQL 中的 A 表 join B 表 join C 表在 common join 中未必也是两个 join 操作,一个 HQL 语句中的相邻的且关联字段相同的多个 join 操作可以合并为一个 Common Join 任务。
例: 1)hive (default)
select a.val, b.val, c.val from
a join b on (a.key = b.key1) join c on (c.key = b.key1)
上述 sql 语句中两个 join 操作的关联字段均为 b 表的 key1 字段,则该语句中的两个 join 操作可由一个 Common Join 任务实现,也就是可通过 1 个 Map Reduce 任务实现。
2)hive (default)> select a.val, b.val, c.val from
a join b on (a.key = b.key1) join c on (c.key = b.key2)
上述 sql 语句中的两个 join 操作关联字段各不相同,则该语句的两个 join 操作需要各自通过一个 Common Join 任务实现,也就是通过 2 个 Map Reduce 任务实现。
Map Join
原理
Map Join 算法可以通过一个 MR 和一个 MapJoin 阶段完成一个 join 操作,省去了 shuffle 和 reduce,在第二个 map 阶段进行表的 join,不需要进入 reduce 阶段。其适用场景为大表 join 小表。第一个 Job 会读取小表数据,将其制作为 hash table,并上传至 Hadoop 分布式缓存(本质上是上传至 HDFS)。第二个 Job 会先从分布式缓存中读取小表数据,并缓存在 Map Task 的内存中,然后扫描大表数据,这样在 map 端即可完成关联操作。如下图所示:

mapreduce local task 是本地任务,读取小表数据,因为小表数据占用内存资源少,所以不上传到 yarn,直接在本地读取效率更高 ,读取后序列化生成 hash table 并上传到 hdfs 的 cache 中。
其中 Mapper 是实现 Map 阶段功能的代码组件。它接受原始数据作为输入,执行某种转换操作,然后输出一组键值对。这些键值对会作为 Reduce 阶段的输入。
例:SELECT a.key, a.value FROM a JOIN b ON a.key = b.key
前提 b 表是一张小表,具体小表有多小,由参数 hive.mapjoin.smalltable.filesize 来决定,默认值是 25M。
参数列表:
1)小表自动选择 Mapjoin
set hive.auto.convert.join=true;
默认值:false。该参数为 true 时,Hive 自动对左边的表统计量,若是小表就加入内存,即对小表使用 Map join 2)小表阀值 set hive.mapjoin.smalltable.filesize=25000000; ?默认值:25M
优化
法一:hint 提示 手动指定通过 map join 算法,该方式已经过时,不推荐使用。
hive (default)> select /_+ map join(ta) _/
ta.id, tb.id from table_a ta join table_b tb on ta.id=tb.id;
法二:自动触发 Hive 在编译 HQL 语句阶段,起初所有的 join 操作均采用 Common Join 算法实现。
之后在物理优化阶段,Hive 会根据每个 Common Join 任务所需表的大小判断该 Common Join 任务是否能够转换为 Map Join 任务,若满足要求(小表大小<指定的阈值),便将 Common Join 任务自动转换为 Map Join 任务。
但有些 Common Join 任务所需的表大小,在 HQL 的编译阶段是未知的(例如对子查询进行 join 操作),所以这种 Common Join 任务是否能转换成 Map Join 任务在编译阶是无法确定的。
针对这种情况,Hive 会在编译阶段生成一个条件任务(Conditional Task),其下会包含一个计划列表,计划列表中包含转换后的 Map Join 任务以及原有的 Common Join 任务。最终具体采用哪个计划,是在运行时决定的。大致思路如下图所示:

Map join 自动转换的具体判断逻辑如下图所示:

图片详情看尚硅谷 P135
寻找大表候选人时还不知道每张表的大小,那么选择规则是看 join 方式,有 innner join、left join、right join 等等。
inner join:每个表都可能是大表候选人。
left join:默认左表为大表候选人,右表当作小表,这样小表会缓存到内存中,以大表为主,从大表中一条条 join 内存中的小表,如果反过来把大表缓存到内存中,以小表为主,从小表中一条条 join 内存中的大表,若出现大表有该字段而小表没有的情况,这种情况下就会出现大量数据 join 失败,小表数据少,大表数据多,那么会因为小表浪费很多数据,所以通常是左表为大表,右表为小表。
right join:左表当作小表,右表为大表候选人。
full outer join:找不到大表候选人,因为全外联要返回两个表的全部数据,两个表都要去遍历,就无法 map join 优化。
涉及参数:
–启动 Map Join 自动转换
set hive.auto.convert.join=true;
–一个 Common Join operator 转为 Map Join operator 的判断条件:若该 Common Join 相关的表中,把每一个表都当作大表候选人,若除大表之外的任意一张已知大小的表的大小>大表候选人,则该组合不成立,不生成 map join,反之生成一个 Map Join 计划。此时可能存在多种组合均满足该条件,则 hive 会为每种满足条件的组合均生成一个 Map Join 计划,同时还会保留原有的 Common Join 计划作为后备(back up)计划,实际运行时,优先执行 Map Join 计划,若不能执行成功,则启动 Common Join 后备计划。
set hive.mapjoin.smalltable.filesize=250000;
–开启无条件转 Map Join
set hive.auto.convert.join.noconditionaltask=true; -无条件转 Map Join 时的小表之和阈值,若一个 Common Join operator 相关的表中,存在 n-1 张表的大小总和<=该值,此时 hive 便不会再为每种 n-1 张表的组合均生成 Map Join 计划,同时也不会保留 Common Join 作为后备计划。而是只生成一个最优的 Map Join 计划。 set hive.auto.convert.join.noconditionaltask.size=10000000;
优化案例
hive (default)> select * from order_detail od
join product_info product on od.product_id = product.id
join province_info province on od.province_id = province.id;
优化前
上述 SQL 语句共有三张表进行两次 join 操作,且两次 join 操作的关联字段不同。故优化前的执行计划应该包含两个 Common Join operator,也就是由两个 MapReduce 任务实现。执行计划如下图所示:

优化思路
使用如下语句获取表/分区的大小信息:
hive (default)>
desc formatted table_name partition(partition_col=‘partition’);
经分析,参与 join 的三张表,数据量如下:

方案一:
启用 Map Join 自动转换。
set hive.auto.convert.join=true;
不使用无条件转 Map Join。
set hive.auto.convert.join.noconditionaltask=false;
调整 hive.mapjoin.smalltable.filesize 参数,使其大于等于 product_info。
set hive.mapjoin.smalltable.filesize=25285707;
这样可保证将两个 Common Join operator 均可转为 Map Join operator,并保留 Common Join 作为后备计划,保证计算任务的稳定。调整完的执行计划如下图:

方案二:
启用 Map Join 自动转换。
set hive.auto.convert.join=true;
使用无条件转 Map Join。
set hive.auto.convert.join.noconditionaltask=true;
调整 hive.auto.convert.join.noconditionaltask.size 参数,使其大于等于 product_info 和 province_info 之和。
set hive.auto.convert.join.noconditionaltask.size=25286076;
这样可直接将两个 Common Join operator 转为两个 Map Join operator,并且由于两个 Map Join operator 的小表大小之和小于等于 hive.auto.convert.join.noconditionaltask.size,故两个 Map Join operator 任务可合并为同一个。这个方案计算效率最高,但需要的内存也是最多的。
调整完的执行计划如下图:
方案三:
启用 Map Join 自动转换。
set hive.auto.convert.join=true;
使用无条件转 Map Join。
set hive.auto.convert.join.noconditionaltask=true;
调整 hive.auto.convert.join.noconditionaltask.size 参数,使其等于 product_info。
set hive.auto.convert.join.noconditionaltask.size=25285707;
这样可直接将两个 Common Join operator 转为 Map Join operator,但不会将两个 Map Join 的任务合并。该方案计算效率比方案二低,但需要的内存也更少。
调整完的执行计划如下图:

Bucket Map Join
原理
Bucket Map Join 是对 Map Join 算法的改进,其打破了 Map Join 只适用于大表 join 小表的限制,可用于大表 join 大表的场景。分桶其实就是把大表化成了“小表”,然后 Map-Side Join 解决。
Bucket Map Join 的核心思想是:若能保证参与 join 的表均为分桶表,且关联字段为分桶字段,且其中一张表的分桶数量是另外一张表分桶数量的整数倍,就能保证参与 join 的两张表的分桶之间具有明确的关联关系,所以就可以在两表的分桶间进行 Map Join 操作了。这样一来,第二个 Job 的 Map 端就无需再缓存小表的全表数据了,而只需缓存其所需的分桶即可。其原理如图所示:
第一个 map 对较小的表 tableB 的每个 bucket 序列化成 hash table,上传到 hdfs cache 中,第二个 map 对较大的表 tableA 的每个桶单独切片,有几个桶就有几个 mapper
优化
hint 提示
Bucket Map Join 不支持自动转换,啊!原来是 hive 团队在 hive2.x 已经放弃维护 MR 计算引擎,建议使用 spark 等计算引擎(看到这乐死我了 tmd 白学了)。

参数:
–关闭 cbo 优化,cbo 会导致 hint 信息被忽略
set hive.cbo.enable=false;
–map join hint 默认会被忽略(因为已经过时),需将如下参数设置为 false
set hive.ignore.mapjoin.hint=false;
–启用 bucket map join 优化功能
set hive.optimize.bucketmapjoin = true;
优化案例
hive (default)> select _ from( select _ from order_detail where dt=‘2020-06-14’) od
join( select * from payment_detail where dt=‘2020-06-14’) pd on od.id=pd.order_detail_id;
优化前
上述 SQL 语句共有两张表一次 join 操作,故优化前的执行计划应包含一个 Common Join 任务,通过一个 MapReduce Job 实现。执行计划如下图所示:

优化思路
经分析,参与 join 的两张表,数据量如下。

两张表都相对较大,若采用普通的 Map Join 算法,则 Map 端需要较多的内存来缓存数据,可以选择为 Map 段分配更多的内存,来保证任务运行成功。但是,Map 端的内存不可能无上限的分配,所以当参与 Join 的表数据量均过大时,可以考虑采用 Bucket Map Join 算法。
创建两个分桶表,order_detail 建议分 16 个 bucket,payment_detail 建议分 8 个 bucket,注意分桶个数的倍数关系以及分桶字段。然后向其中导入数据。
设置优化参数:
–关闭 cbo 优化,cbo 会导致 hint 信息被忽略,需将如下参数修改为 false
set hive.cbo.enable=false;
–map join hint 默认会被忽略(因为已经过时),需将如下参数修改为 false
set hive.ignore.mapjoin.hint=false;
–启用 bucket map join 优化功能,默认不启用,需将如下参数修改为 true
set hive.optimize.bucketmapjoin = true;
重写 SQL 语句:
hive (default)>
select /_+ mapjoin(pd) _/ * from order_detail_bucketed od
join payment_detail_bucketed pd on od.id = pd.order_detail_id;
执行结果如下:

使用
hive (default)>
explain extended select /_+ mapjoin(pd) _/ *
from order_detail_bucketed od
join payment_detail_bucketed pd on od.id = pd.order_detail_id;查看执行计划,在 Map Join Operator 中看到 “BucketMapJoin: true”
Sort Merge Bucket Map Join(SMB map join)
原理
SMB Map Join 基于 Bucket Map Join。SMB Map Join 要求,参与 join 的表均为分桶表,且需保证分桶内的数据是有序的,且分桶字段、排序字段和关联字段为相同字段,且其中一张表的分桶数量是另外一张表分桶数量的整数倍。
SMB Map Join 同 Bucket Join 一样,同样是利用两表各分桶之间的关联关系,在分桶之间进行 join 操作,不同的是,分桶之间的 join 操作的实现原理。Bucket Map Join,两个分桶之间的 join 实现原理为 Hash Join 算法;而 SMB Map Join,两个分桶之间的 join 实现原理为 Sort Merge Join 算法。
Hash Join 和 Sort Merge Join 均为关系型数据库中常见的 Join 实现算法。Hash Join 的原理相对简单,就是对参与 join 的一张表构建 hash table,然后扫描另外一张表,然后进行逐行匹配。Sort Merge Join 需要在两张按照关联字段排好序的表中进行,其原理如图所示:
Hive 中的 SMB Map Join 就是对两个分桶的数据按照上述思路进行 Join 操作。可以看出,SMB Map Join 与 Bucket Map Join 相比,在进行 Join 操作时,Map 端是无需对整个 Bucket 构建 hash table,也无需在 Map 端缓存整个 Bucket 数据的,每个 Mapper 只需按顺序逐个 key 读取两个分桶的数据进行 join 即可。
优化
Sort Merge Bucket Map Join 有两种触发方式,包括 Hint 提示和自动转换。Hint 提示已过时,不推荐使用。下面是自动转换的相关参数:
–启动 Sort Merge Bucket Map Join 优化
set hive.optimize.bucketmapjoin.sortedmerge=true;
–使用自动转换 SMB Join
set hive.auto.convert.sortmerge.join=true;
和 bucket map join 一样,创建分桶表并导入数据 ,设置参数,运行 HQL,结果如下:

数据倾斜优化
数据倾斜问题,通常是指参与计算的数据分布不均,即某个 key 或者某些 key 的数据量远超其他 key,导致在 shuffle 阶段,大量相同 key 的数据被发往同一个 Reduce,进而导致该 Reduce 所需的时间远超其他 Reduce,成为整个任务的瓶颈。
Hive 中的数据倾斜常出现在分组聚合和 join 操作的场景中,下面分别介绍在上述两种场景下的优化思路。
分组聚合导致的数据倾斜
Hive 中未经优化的分组聚合,是通过一个 MapReduce Job 实现的。Map 端负责读取数据,并按照分组字段分区,通过 Shuffle,将数据发往 Reduce 端,各组数据在 Reduce 端完成最终的聚合运算。
如果 group by 分组字段的值分布不均,就可能导致大量相同的 key 进入同一 Reduce,从而导致数据倾斜问题。
由分组聚合导致的数据倾斜问题,有以下两种解决思路:
Map-Side 聚合
前文提过,此处略过
Skew-GroupBy 优化
原理是启动两个 MR 任务,第一个 MR 按照随机数分区,将数据分散发送到 Reduce,完成部分聚合,第二个 MR 把打散的数据按照分组字段分区,完成最终聚合。
优化前
该表数据中的 province_id 字段是存在倾斜的,若不经过优化,通过观察任务的执行过程,是能够看出数据倾斜现象的。

优化后
–启用 skew-groupby
set hive.groupby.skewindata=true;
–关闭 map-side 聚合(map side 聚合默认是开启的)
set hive.map.aggr=false;
开启 Skew-GroupBy 优化后,可以很明显看到该 sql 执行在 yarn 上启动了两个 mr 任务,第一个 mr 打散数据,第二个 mr 把打散后的数据进行分组聚合。
Join 导致的数据倾斜
未经优化的 join 操作,默认是使用 common join 算法,也就是通过一个 MapReduce Job 完成计算。Map 端负责读取 join 操作所需表的数据,并按照关联字段进行分区,通过 Shuffle,将其发送到 Reduce 端,相同 key 的数据在 Reduce 端完成最终的 Join 操作。
如果关联字段的值分布不均,就可能导致大量相同的 key 进入同一 Reduce,从而导致数据倾斜问题。由 join 导致的数据倾斜问题,有如下三种解决方案:
map join
略过
skew join
原理是为倾斜的大 key 单独启动一个 map join 任务进行计算,其余 key 进行正常的 common join。原理图如下:

–启用 skew join 优化
set hive.optimize.skewjoin=true;
–触发 skew join 的阈值,若某个 key 的行数超过该参数值,则触发
set hive.skewjoin.key=100000;
这种方案对参与 join 的源表大小没有要求,但是对两表中倾斜的 key 的数据量有要求,要求一张表中的倾斜 key 的数据量比较小(方便走 map join)。
任务并行度优化
Hive 的计算任务由 MapReduce 完成,故并行度的调整需要分为 Map 端和 Reduce 端。
Map 端并行度
Map 端的并行度,也就是 Map 的个数。是由输入文件的切片数决定的。一般情况下,Map 端的并行度无需手动调整。
以下特殊情况可考虑调整 map 端并行度: 1)查询的表中存在大量小文件
按照 Hadoop 默认的切片策略,一个小文件会单独启动一个 map task 负责计算。若查询的表中存在大量小文件,则会启动大量 map task,造成计算资源的浪费。这种情况下,可以使用 Hive 提供的 CombineHiveInputFormat,多个小文件合并为一个切片,从而控制 map task 个数。相关参数如下:
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
2)map 端有复杂的查询逻辑
若 SQL 语句中有正则替换、json 解析等复杂耗时的查询逻辑时,map 端的计算会相对慢一些。若想加快计算速度,在计算资源充足的情况下,可考虑增大 map 端的并行度,令 map task 多一些,每个 map task 计算的数据少一些。相关参数如下:
–一个切片的最大值
set mapreduce.input.fileinputformat.split.maxsize=256000000;
Reduce 端并行度
Reduce 端的并行度,可由用户自己指定,也可由 Hive 自行根据该 MR Job 输入的文件大小进行估算。
Reduce 端的并行度的相关参数如下:
–指定 Reduce 端并行度,默认值为-1,表示用户未指定
set mapreduce.job.reduces;
–Reduce 端并行度最大值
set hive.exec.reducers.max;
–单个 Reduce Task 计算的数据量,用于估算 Reduce 并行度
set hive.exec.reducers.bytes.per.reducer;
Reduce 端并行度的确定逻辑如下:
若指定参数 mapreduce.job.reduces 的值为一个非负整数,则 Reduce 并行度为指定值。否则,Hive 自行估算 Reduce 并行度,估算逻辑如下:
假设 Job 输入的文件大小为 totalInputBytes
参数 hive.exec.reducers.bytes.per.reducer 的值为 bytesPerReducer。
参数 hive.exec.reducers.max 的值为 maxReducers。
则 Reduce 端的并行度为:

根据上述描述,可以看出,Hive 自行估算 Reduce 并行度时,是以整个 MR Job 输入的文件大小作为依据的。因此,在某些情况下其估计的并行度很可能并不准确,此时就需要用户根据实际情况来指定 Reduce 并行度了。
在默认情况下,是会进行 map-side 聚合的,也就是 Reduce 端接收的数据,实际上是 map 端完成聚合之后的结果。观察任务的执行过程,会发现,每个 map 端输出的数据只有 34 条记录,共有 5 个 map task。
也就是说 Reduce 端实际只会接收 170(34*5)条记录,故理论上 Reduce 端并行度设置为 1 就足够了。这种情况下,用户可通过以下参数,自行设置 Reduce 端并行度为 1,这样把 5 个文件合并为只输出 1 个文件。
–指定 Reduce 端并行度,默认值为-1,表示用户未指定
set mapreduce.job.reduces=1;
小文件合并优化
Map 端输入的小文件合并,和 Reduce 端输出的小文件合并。
合并 Map 端输入的小文件
将多个小文件划分到一个切片中,进而由一个 Map Task 去处理。目的是防止为单个小文件启动一个 Map Task,浪费计算资源。
相关参数为:
–可将多个小文件切片,合并为一个切片,进而由一个 map 任务处理(默认) set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
合并 Reduce 端输出的小文件
将多个小文件合并成大文件。目的是减少 HDFS 小文件数量。其原理是根据计算任务输出文件的平均大小进行判断,若符合条件,则单独启动 1 个额外的任务进行合并。
相关参数为:
–开启合并 map only 任务输出的小文件,默认 false
set hive.merge.mapfiles=true;
–开启合并 map reduce 任务输出的小文件,默认 false
set hive.merge.mapredfiles=true;
–合并后的文件大小
set hive.merge.size.per.task=256000000;
–触发小文件合并任务的阈值,若某计算任务输出的文件平均大小低于该值,则触发合并
set hive.merge.smallfiles.avgsize=16000000;
若 reduce 端设置并行度为 5,则输出 5 个文件。下图为输出文件,可以看出,5 个均为小文件:
要避免 5 个小文件产生,可以设置 reduce 端并行度为 1,有几个 reduce 并行就有几个文件产生,保证其输出结果只有一个文件或启用 hive 合并小文件优化。
启用 Hive 合并小文件优化
设置以下参数:
–开启合并 map reduce 任务输出的小文件
set hive.merge.mapredfiles=true;
–合并后的文件大小
set hive.merge.size.per.task=256000000;
–触发小文件合并任务的阈值,若某计算任务输出的文件平均大小低于该值,则触发合并
set hive.merge.smallfiles.avgsize=16000000;
这样输出文件就合并为一个了

其他优化
CBO 优化
CBO 是指 Cost based Optimizer,即基于计算成本的优化。
在 Hive 中,计算成本模型考虑到了:数据的行数、CPU、本地 IO、HDFS IO、网络 IO 等方面。Hive 会计算同一 SQL 语句的不同执行计划的计算成本,并选出成本最低的执行计划。目前 CBO 在 hive 的 MR 引擎下主要用于 join 的优化,例如多表 join 的 join 顺序。
相关参数为:
–是否启用 cbo 优化
set hive.cbo.enable=true;
1)示例 HQL
hive (default)> select * from order_detail od
join product_info product on od.product_id=product.id
join province_info province on od.province_id=province.id;
2)关闭 CBO 优化
–关闭 cbo 优化
set hive.cbo.enable=false;
–为了测试效果更加直观,关闭 map join 自动转换
set hive.auto.convert.join=false;
根据执行计划,可以看出,三张表的 join 顺序如下:

3)开启 CBO 优化
–开启 cbo 优化
set hive.cbo.enable=true;
–为了测试效果更加直观,关闭 map join 自动转换
set hive.auto.convert.join=false;
根据执行计划,可以看出,三张表的 join 顺序如下:
CBO 优化对于执行计划中 join 顺序是有影响的,其之所以会将 province_info 的 join 顺序提前,是因为 province info 的数据量较小,将其提前,会有更大的概率使得中间结果的数据量变小,从而使整个计算任务的数据量减小,也就是使计算成本变小。
谓词下推
谓词下推(predicate pushdown)是指,尽量将过滤操作前移,以减少后续计算步骤的数据量。
相关参数为:
–是否启动谓词下推(predicate pushdown)优化
set hive.optimize.ppd = true;
需要注意的是:CBO 优化也会完成一部分的谓词下推优化工作,因为在执行计划中,谓词越靠前,整个计划的计算成本就会越低。

矢量化查询
Hive 的矢量化查询优化,依赖于 CPU 的矢量化计算,CPU 的矢量化计算的基本原理如下图:

相关参数如下:
set hive.vectorized.execution.enabled=true;
若执行计划中,出现“Execution mode: vectorized”字样,即表明使用了矢量化计算。
Fetch 抓取
Fetch 抓取是指,Hive 中对某些情况的查询可以不必使用 MapReduce 计算。例如:select * from emp;在这种情况下,Hive 可以简单地读取 emp 对应的存储目录下的文件,然后输出查询结果到控制台。
相关参数如下:
–是否在特定场景转换为 fetch 任务
–设置为 none 表示不转换
–设置为 minimal 表示支持 select *,分区字段过滤,Limit 等
–设置为 more 表示支持 select 任意字段,包括函数,过滤,和 limit 等
set hive.fetch.task.conversion=more;
本地模式
大多数的 Hadoop Job 是需要 Hadoop 提供的完整的可扩展性来处理大数据集的。不过,有时 Hive 的输入数据量是非常小的。在这种情况下,为查询触发执行任务消耗的时间可能会比实际 job 的执行时间要多的多。对于大多数这种情况,Hive 可以通过本地模式在单台机器上处理所有的任务,不必提交到 Yarn。对于小数据集,执行时间可以明显被缩短。
相关参数如下:
–开启自动转换为本地模式
set hive.exec.mode.local.auto=true;
–设置 local MapReduce 的最大输入数据量,当输入数据量小于这个值时采用 local MapReduce 的方式,默认为 134217728,即 128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
–设置 local MapReduce 的最大输入文件个数,当输入文件个数小于这个值时采用 local MapReduce 的方式,默认为 4
set hive.exec.mode.local.auto.input.files.max=10;
并行执行
Hive 会将一个 SQL 语句转化成一个或者多个 Stage,每个 Stage 对应一个 MR Job。默认情况下,Hive 同时只会执行一个 Stage。但是某 SQL 语句可能会包含多个 Stage,但这多个 Stage 可能并非完全互相依赖,也就是说有些 Stage 是可以并行执行的。此处提到的并行执行就是指这些 Stage 的并行执行。
相关参数如下:
–启用并行执行优化
set hive.exec.parallel=true;
–同一个 sql 允许最大并行度,默认为 8
set hive.exec.parallel.thread.number=8;
严格模式
Hive 可以通过设置某些参数防止危险操作:
1)分区表不使用分区过滤
将 hive.strict.checks.no.partition.filter 设置为 true 时,对于分区表,除非 where 语句中含有分区字段过滤条件来限制范围,否则不允许执行。换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。
2)使用 order by 没有 limit 过滤
将 hive.strict.checks.orderby.no.limit 设置为 true 时,对于使用了 order by 语句的查询,要求必须使用 limit 语句。因为 order by 为了执行排序过程会将所有的结果数据分发到同一个 Reduce 中进行处理,强制要求用户增加这个 limit 语句可以防止 Reduce 额外执行很长一段时间(开启了 limit 可以在数据进入到 Reduce 之前就减少一部分数据)。
3)笛卡尔积
将 hive.strict.checks.cartesian.product 设置为 true 时,会限制笛卡尔积的查询。对关系型数据库非常了解的用户可能期望在执行 JOIN 查询的时候不使用 ON 语句而是使用 where 语句,这样关系数据库的执行优化器就可以高效地将 WHERE 语句转化成那个 ON 语句。不幸的是,Hive 并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。

