一、数据倾斜原理

大数据开发,很有可能会遇到数据倾斜的问题,要想解决数据倾斜,首先要理解什么是数据倾斜,以及产生数据倾斜的原因。

数据倾斜主要是指:主要就是数据在每个节点上的分配不均,导致个别节点处理速度很慢,使得程序迟迟不能运行结束。主要表现为:在mapreduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理时间很长,这是因为某一个key中的的条数比其他key要多很多,这条key所在的reduce节点所处理的数据量比其他节点就大很多,从而导致某几个节点迟迟运行不完。如何将数据均匀的分配到各个reduce节点中,就是解决数据倾斜的根本所在

二、Spark中数据倾斜解决

以下针对spark具体计算场景,给出数据倾斜解决方案:
场 景当RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,产生数据倾斜。
出现数据倾斜原因:
在上述分组场景中,具有相同的key的数据就会被分配到同一个组当中,从而分配到同一分区。如果某些相同key的数据量非常大,而其他是key的数据量相对较小,此时就可能产生比较严重的数据倾斜。
本方案通过两个阶段聚合:
阶段一:
先给每个key都打上一个随机数,比如10以内的随机数,比如(spark,1) (spark, 1) (spark, 1) (spark, 1),就会变成(1_spark, 1) (1_spark, 1) (2_spark, 1) (2_spark, 1)。打上随机数以后,原先一样的key就变成不一样的了。然后对数据进行reduceByKey等聚合操作,局部聚合结果变成了(1_spark, 2) (2_spark, 2)。
阶段二:
基于阶段一局部聚合的数据,将各个key的前缀给去掉,就会变成(spark,2)(spark,2),再次进行全局聚合操作,得到最终结果,比如(spark, 4)。

三、Hive中数据倾斜

1.group by导致数据倾斜

(1)、设置hive.map.aggr:默认为true,在map端做聚合,推荐使用

(2)、设置hive.groupby.skewindata:reduce操作的时候,相同key值并不是都给同一个reduce,而是随机分发到各个reduece做聚合。这个参数其实跟hive.map.aggr做的类似,只是在reduce端做,要额外启动一轮job,不推荐使用

(3)、优化sql语句

有个t表,数据量很大,假如字段a代表的性别,那么只有2个值,对a进行group by操作,所有聚合运行将会落在两个节点上。优化方法,先group by a b,b需要一个比较分散的值,比如班级或者年级,得到一个较小规模的中间结果数据,再对中间结果group by a。

改写前

select a, count(distinct b) as c from t group by a;

改写后

select a, count(*) as c from (select a, b from t group by a, b) group by a;

2.join操作导致数据倾斜

select * from logs a join users b on a.user_id = b.user_id;

日志表有大量未登陆用户的数据,即user_id为0,reduce时候,某个节将会其他节点多出大量数据,形成单点压力。

(1)、设置hive.optimize.skewjoin和hive.skewjoin.key参数

其原理把这种user_id为0的特殊值先不在reduce端计算掉,而是先写入hdfs,然后启动一轮map join专门做这个特殊值的计算,期望能提高计算这部分值的处理速度。hive.skewjoin.key设置值比如是1万,那么超过1万条记录的值就是特殊值。

(2)、 优化sql,特殊值分开处理

user_id=0的单独做join,这样user_id=0转化成map join,user_id!=0是没有数据倾斜的普通join。

   select
        *
    from (select * from logs where user_id = 0) a
    join (select * from users where user_id = 0) b on a.user_id = b.user_id
   union all
   select * from logs a join users b on a.user_id <> 0 and a.user_id = b.user_id;

(3)、优化sql,特殊值赋予新key

   select * from logs a
    left outer join users b
    on
        case
            when a.user_id is null
            then concat('prefix_', rand())
            else a.user_id
        end = b.user_id;

(4)、优化sql,关联key随机打散

   select a.*,b.*
    from (select *, cast(rand() * 10 as int) as r_id from logs) a
    join (select *, r_id from items lateral view explode(range_list(1, 10)) rl as r_id) b
    on a.item_id = b.item_id and a.r_id = b.r_id

对行为表的每条记录生成一个1-10的随机整数,对于item属性表,每个item生成10条记录,随机key分别也是1-10,这样就能保证行为表关联上属性表。其中range_list(1,10)代表用udf实现的一个返回1-10整数序列的方法。这个做法是一个解决join倾斜比较根本性的通用思路,就是如何用随机数将key进行分散。

3.不同类型关联导致数据倾斜
用户表中user_id字段为int,logs表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join操作时,默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配到一个Reducer中。需要把数字类型统一转换成字符串类型。

select    * from users a
left outer join logs b on a.usr_id = cast(b.user_id as string)

4.利用map join解决数据倾斜问题

(1)、大小表关联

 select * from users as a join logs b  on a.user_id = b.user_id

如果users表只有100行数据,logs表有1亿条数据且数据倾斜特别严重,reduce过程中同样会遇到数据倾斜问题。

利用map join,会把小表全部读入内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map是进行了join操作,省去了reduce运行的效率也会高很多。

select /*+ mapjoin(a)*/ from users as a join logs b  on a.user_id = b.user_id

(2)、where条件中存在不等式造成的笛卡尔积

map join还有一个优势,能够进行不等连接的join操作,如果将不等条件写在where中,那么mapreduce过程中会进行笛卡尔积,运行效率特别低,如果使用map join操作,在map的过程中就完成了不等值的join操作,效率会高很多。

select /*+ mapjoin(a)*/ from A join B where A.a>B.a

(3)、小表不大大表不小

select * from log a left outer join users b on a.user_id = b.user_id;

users 表有 600w+ 的记录,把 users 分发到所有的 map 上也是个不小的开销,而且 map join 不支持这么大的小表。如果用普通的 join,又会碰到数据倾斜的问题。

log里user_id有上百万个,但每日的会员uv不会太多,有交易的会员不会太多,有点击的会员不会太多,有佣金的会员不会太多等等,可以利用这一特性,通过map join进行优化

select
    /*+mapjoin(x)*/
    *
from
    logs a
left outer join
    (
        select
            /*+mapjoin(c)*/
            d.*
        from
            (
                select distinct user_id from logs
            )
            c
        join users d
        on
            c.user_id = d.user_id
    )
    x on a.user_id = b.user_id; 
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐