1 cache(), persist()和unpersist()

原文链接:Spark DataFrame Cache and Persist Explained

spark中DataFrame或Dataset里的cache()方法默认存储等级为MEMORY_AND_DISK,这跟RDD.cache()存储等级MEMORY_ONLY是不一样的。理由是重新计算内存中的表的代价是昂贵的。MEMORY_AND_DISK表示如果内存中缓存不下,就存在磁盘上。

spark的dataset类中的cache()方法内部调用的是persist()方法。cache()在spark中是懒惰的方法,必须触发了action操作才会被执行。

val dfPersist = df.cache()#默认存储等级为MEMORY_AND_DISK

如果直接使用persist()方法,可以选择存储等级,存储等级有MEMORY_ONLY,MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY, MEMORY_ONLY_2,MEMORY_AND_DISK_2 等。

val dfPersist = df.persist()#默认存储等级为MEMORY_AND_DISK
dfPersist.show(false)

val dfPersist = df.persist(StorageLevel.MEMORY_ONLY)#设置缓存的存储等级为MEMORY_ONLY
dfPersist.show(false)

spark会自动检测每个persist()和cache()操作,它会检测各个结点的使用情况,如果数据不再使用会把持久化(persisted)的数据删掉,依据的是最近最少使用(least-recently-used LRU)算法。你也可以手动使用unpersist()将持久化的数据从内存和磁盘中删掉。

val dfPersist = dfPersist.unpersist()

2 关于spark persist()的两个坑

原文链接:关于spark persist()的两个坑

2.1 persist() + show()不能缓存全部数据

对于一个有100个分区的数据,假如你对其persist()后进行show(10),那么spark只会把第一个分区缓存起来,show前10条数据。如果你show的数据量超过了一个分区的大小,那么spark会多缓存一些分区。

因此persist()后如果希望数据全部都缓存到内存中,应对每个分区都执行action操作,如进行count()。

2.2 unpersist()一个rdd时会同时unpersist()子RDD

在spark 2.4之前的版本,当你创建了一个DataFrame a,同时由a得到b,并且把2个DataFrame都缓存起来了,如果执行a.unpersist(),会把b也自动unpersist()。

这个潜在的坑会使得你的spark程序增加了大量的计算量,鄙人之前由于计算数据量大,把代码分成了八步,每一步结束后都对当前df进行了persist() + count(),然后对父rdd进行了unpersist,这样导致了第一步重复计算了8次、第二步重复计算了7次、…、第七步重复计算了2次,通过查看spark UI才发现了此问题。

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐