数据倾斜定义

定义:对于集群系统,一般缓存是分布式的,即不同节点负责一定范围的缓存数据。我们把缓存数据分散度不够,导致大量的缓存数据集中到了一台或者几台服务节点上,称为数据倾斜。一般来说数据倾斜是由于负载均衡实施的效果不好引起的。
举例:如果有10亿数据,一台电脑可能要10小时,现在集群有10台,可能1小时就够了,但是有可能大量的数据集中到一台或几台上,要5小时,发生了数据倾斜。
数据倾斜过程:数据经过 map后,由于不同key 的数据量分布不均,在shuffle 阶段中通过 partition 将相同的 key 的数据打上发往同一个 reducer 的标记,然后开始 spill (溢写)写入磁盘,最后merge成最终map阶段输出文件。
如此一来,数据量很大的key 将发往同一个 reducer,超出了节点的计算能力,等待时间超出了可接受范围;

数据倾斜表现

如何判断是否存在数据倾斜:
1、分析节点资源管理器,如果大部分节点已经执行完成,而个别节点长时间执行不完,很可能发生了数据倾斜。

2、分析执行日志,作业在reduce阶段停留在99%,很长时间完成不了,很可能发生了数据倾斜。

数据倾斜原因

  • key值分布不均匀
    • 数据中存在大量相同key值
    • 数据中的key值存在大量异常值和空值
  • 业务数据本身特性
    • 例如某个分公司订单量大幅提升几十倍甚至几百倍,对该分公司的订单统计聚合时,容易发生数据倾斜。
  • 某些SQL语句本身导致数据倾斜
    • group by 维度过少,某个值数量较多。
    • count distinct 特殊值较多
    • join 一个key值集中的小表
    • 两个表中关联字段存在大量空值或关联字段数据类型不一致

数据倾斜解决方案

1、group by导致数据倾斜

两阶段聚合,放粗粒度:需要聚合的key加一个随机数的前后缀,按照加上前后缀的key分组聚合一次,之后再按照原始的key分组聚合一次。
详细过程:生成的查询计划有两 个 MapReduce 任务。
在第一个MapReduce 中,map的输出结果集合会随机分布到 reduce 中,每个 reduce做部分聚合操作,并输出结果。相同的 Group By Key 有可能分发到不同的reduce中,从而达到负载均衡的目的;
第二个 MapReduce 任务再根据预处理的数据结果按照 Group By Key 分布到 reduce中(这个过程可以保证相同的 Group By Key 分布到同一个 reduce中),最后完成最终的聚合操作。

假设 key = 水果
select count(substr(a.key,1,2)) as key
from(
	select concat(key,'_',cast(round(10*rand())+1 as string)) tmp
	from table
	group by tmp
)a
group by key

2、count(distinct)特殊值较多

采用count() group by的方式来替换count(distinct)

select count(distinct a) from test ;
select count x.a 
from (select a from test group by a ) x 

select a, count(distinct b) as c from tbl group by a;
select a, count(*) as c from (select a, b from tbl group by a, b) group by a;

3、join操作

  • 小表和大表join
    reduce join 改为 map join:将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD 的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。
    设置自动选择MapJoin set hive.auto.convert.join = true;默认为true。
    reduce join: 先将所有相同的key,对应的values,汇聚到一个task中,然后再进行join。
    map reduce:broadcast出去那个小表的数据以后,就会在每个executor的block manager中都驻留一份+map算子来实现与join同样的效果。不会发生shuffe,从根本上杜绝了join操作可能导致的数据倾斜的问题;
  • 大表join大表
    将有大表中倾斜Key对应的数据集单独抽取出来加上随机前缀,另外一个RDD每条数据分别与随机前缀结合形成新的RDD(笛卡尔积,相当于将其数据增到到原来的N倍,N即为随机前缀的总个数)然后将二者Join后去掉前缀。然后将不包含倾斜Key的剩余数据进行Join。最后将两次Join的结果集通过union合并,即可得到全部Join结果。

4、空值或数据类型不一致所致

  • 解决空值导致数据倾斜
1.在查询的时候,过滤掉所有为NULL的数据,比如:
SELECT * FROM log a
JOIN bmw_users b ON a.user_id IS NOT NULL AND a.user_id = b.user_id
UNION ALL
SELECT *FROM log a WHERE a.user_id IS NULL;

2.查询出空值并给其赋上随机数,避免了key值为空(数据倾斜中常用的一种技巧)
SELECT *FROM log a
LEFT JOIN bmw_users b ON 
CASE WHEN a.user_id IS NULL THEN concat(‘dp_hive’, rand()) ELSE a.user_id END = b.user_id;
  • 关联数据类型不一致产生数据倾斜
    一张表 s8_log,每个商品一条记录,要和商品表关联。s8_log 中有字符串商品 id,也有数字的商品 id。字符串商品 id 类型是 string 的,但商品中的数字 id 是 bigint 的。
    问题的原因是把 s8_log 的商品 id 转成数字 id 做 Hash(数字的 Hash 值为其本身,相同的字符串的 Hash 也不同)来分配 Reducer,所以相同字符串 id 的 s8_log,都到一个 Reducer 上了。
-- 把数字类型转换成字符串类型
SELECT *
FROM s8_log a
LEFT JOIN r_auction_auctions b ON a.auction_id = CAST(b.auction_id AS string);

5、优化in/exists

hive1.2.1也支持in/exists操作,但还是推荐使用hive的一个高效替代方案:left semi join

6、排序选择

cluster by: 对同一字段分桶并排序,不能和sort by连用;
distribute by + sort by: 分桶,保证同一字段值只存在一个结果文件当中,结合sort by 保证每个reduceTask结果有序;
sort by: 单机排序,单个reduce结果有序;
order by:全局排序,缺陷是只能使用一个reduce;

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐