如何发现Spark数据倾斜

2018年12月10日

现象

  • 绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,993个task都在1分钟之内执行完了,但是剩余两7个task却要一两个小时。
  • 原本能够正常执行的Spark作业,某天突然报出OOM(内存溢出)异常,观察异常栈,一般是业务代码造成的。这种情况比较少见。

原理

在进行shuffle的时候,会将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行 聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。

比如大部分key对应10条数据,但是个别key却对应了100万 条数据,那么大部分task可能就只会分配到10条数据,然后1秒钟就运行完了;但是个别task可能分配到了100万数据,要运行一两个小时。因此,整个Spark作业的运行进度是由运行时间最长的那个task决定的。

下图就是一个例子:hello这个key,在三个节点上对应了总共7条数据,这些数据都会被拉取到同一个task中进行处理;而world 和you这两个key分别才对应1条数据,所以另外两个task只要分别处理1条数据即可。此时第一个task的运行时间可能是另外两个task的7倍, 而整个stage的运行速度也由运行最慢的那个task所决定。

 

 

如何定位?

数据倾斜只会发生在shuffle过程中。可能会触发shuffle操作的算子:distinct、 groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时, 可能就是代码中使用了这些算子中的某一个所导致的。

 

发现哪个task执行特别慢:

首先要看的,就是数据倾斜发生在第几个stage中。

可以通过Spark Web UI来查看当前运行到了第几个stage。此外,无论是使用yarn-client模式还是yarn-cluster模式,我们都可以在Spark Web UI上深入看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。

比如下图中,倒数第三列显示了每个task的运行时间。明显可以看到,有的task运行特别快,只需要几秒钟就可以运行完;而有的task运行特别 慢,需要几分钟才能运行完,此时单从运行时间上看就已经能够确定发生数据倾斜了。

此外,倒数第一列显示了每个task处理的数据量,明显可以看到,运行时 间特别短的task只需要处理几百KB的数据即可,而运行时间特别长的task需要处理几千KB的数据,处理的数据量差了10倍。此时更加能够确定是发生 了数据倾斜

 

发现哪个stage执行特别慢:

知道数据倾斜发生在哪一个stage之后,就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分 代码中肯定会有一个shuffle类算子。

一个相对简单实用 的推算方法:只要看到Spark代码中出现了一个shuffle类算子或者是Spark SQL的SQL语句中出现了会导致shuffle的语句(比如group by语句),那么就可以判定,以那个地方为界限划分出了前后两个stage。

如下代码所示:

----------------stage0-----------------------
val conf = new SparkConf()
val sc = new SparkContext(conf)

val lines = sc.textFile("hdfs://...")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))

----------------stage1-----------------------
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.collect().foreach(println(_))

比如,通过在Spark Web UI或者本地log中发现,stage1的某几个task执行得特别慢,判定stage1出现了数据倾斜,那么就可以回到代码中定位出stage1主要包 括了reduceByKey这个shuffle类算子,此时就可以确定是由reduceByKey算子导致的数据倾斜问题

没有评论

发表评论

邮箱地址不会被公开。 必填项已用*标注