Spark Tungsten-sort Based Shuffle 分析

  文/祝威廉(简书作者)

  原文链接:http://www.jianshu.com/p/d328c96aebfd

  著作权归作者所有,转载请联系作者获得授权,并标注“简书作者”。

  Tungsten-sort 算不得一个全新的shuffle 方案,它在特定场景下基于类似现有的Sort Based Shuffle处理流程,对内存/CPU/Cache使用做了非常大的优化。带来高效的同时,也就限定了自己的使用场景。如果Tungsten-sort 发现自己无法处理,则会自动使用 Sort Based Shuffle进行处理。

  前言

  看这篇文章前,建议你先简单看看Spark Sort Based Shuffle内存分析。

  Tungsten 中文是钨丝的意思。 Tungsten Project 是 Databricks 公司提出的对Spark优化内存和CPU使用的计划,该计划初期似乎对Spark SQL优化的最多。不过部分RDD API 还有Shuffle也因此受益。

  简述

  Tungsten-sort优化点主要在三个方面:

  直接在serialized binary data上sort而不是java objects,减少了memory的开销和GC的overhead。

  提供cache-efficient sorter,使用一个8bytes的指针,把排序转化成了一个指针数组的排序。

  spill的merge过程也无需反序列化即可完成

  这些优化的实现导致引入了一个新的内存管理模型,类似OS的Page,对应的实际数据结构为MemoryBlock,支持off-heap 以及 in-heap 两种模式。为了能够对Record 在这些MemoryBlock进行定位,引入了Pointer(指针)的概念。

  如果你还记得Sort Based Shuffle里存储数据的对象PartitionedAppendOnlyMap,这是一个放在JVM heap里普通对象,在Tungsten-sort中,他被替换成了类似操作系统内存页的对象。如果你无法申请到新的Page,这个时候就要执行spill操作,也就是写入到磁盘的操作。具体触发条件,和Sort Based Shuffle 也是类似的。

  开启条件

  Spark 默认开启的是Sort Based Shuffle,想要打开Tungsten-sort ,请设置

  spark.shuffle.manager=tungsten-sort

  对应的实现类是:

  org.apache.spark.shuffle.unsafe.UnsafeShuffleManager

  名字的来源是因为使用了大量JDK Sun Unsafe API。

  当且仅当下面条件都满足时,才会使用新的Shuffle方式:

  Shuffle dependency 不能带有aggregation 或者输出需要排序

  Shuffle 的序列化器需要是 KryoSerializer 或者 Spark SQL's 自定义的一些序列化方式.

  Shuffle 文件的数量不能大于 16777216

  序列化时,单条记录不能大于 128 MB

  可以看到,能使用的条件还是挺苛刻的。

  这些限制来源于哪里

  参看如下代码,page的大小:

  this.pageSizeBytes = (int) Math.min(

  PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,

  shuffleMemoryManager.pageSizeBytes());

  这就保证了页大小不超过PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES 的值,该值就被定义成了128M。

  而产生这个限制的具体设计原因,我们还要仔细分析下Tungsten的内存模型:

  

 

  来源于:https://github.com/hustnn/TungstenSecret/tree/master

  这张图其实画的是 alt="" width="946" height="807" />

 

  图片来源:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-core/

  (另外,值得注意的是,这张图里进行spill操作的同时检查内存可用而导致的Exeception 的bug 已经在1.5.1版本被修复了,忽略那条路径)

  内存是否充足的条件依然shuffleMemoryManager 来决定,也就是所有task shuffle 申请的Page内存总和不能大于下面的值:

  ExecutorHeapMemeory * 0.2 * 0.8

  上面的数字可通过下面两个配置来更改:

  spark.shuffle.memoryFraction=0.2

  spark.shuffle.safetyFraction=0.8

  UnsafeShuffleExternalSorter 负责申请内存,并且会生成该条记录最后的逻辑地址,也就前面提到的 Pointer。

  接着Record 会继续流转到UnsafeShuffleInMemorySorter中,这个对象维护了一个指针数组:

  private long[] pointerArray;

  数组的初始大小为 4096,后续如果不够了,则按每次两倍大小进行扩充。

  假设100万条记录,那么该数组大约是8M 左右,所以其实还是很小的。一旦spill后该UnsafeShuffleInMemorySorter就会被赋为null,被回收掉。

  我们回过头来看spill,其实逻辑上也异常简单了,UnsafeShuffleInMemorySorter 会返回一个迭代器,该迭代器粒度每个元素就是一个指针,然后到根据该指针可以拿到真实的record,然后写入到磁盘,因为这些record 在一开始进入UnsafeShuffleExternalSorter 就已经被序列化了,所以在这里就纯粹变成写字节数组了。形成的结构依然和Sort Based Shuffle 一致,一个文件里不同的partiton的数据用fileSegment来表示,对应的信息存在一个index文件里。