深入理解 Spark Partitioning

RDD是一个大型的数据项集合。在我们的大数据处理中,这种大型数据项集合在单机环境下是放不下的,因此,它们被划分在多个节点上。当然,RDD的分区操作是由Spark自动完成的。

RDD Partitioning 属性

Property Description
partitions returns an array with all partition references of source RDD
partitions.size gives number of partitions of source RDD
partitioner returns Partitioner if any, HashPartitioner, RangePartitioner,CustomPartitioner.
defaultParallelism returns default level of parallelism defined on SparkContext.By default it is number of cores available to application.

Spark用RDD的partitioner来决定分区算法。如果partitioner为None的话,则随机均等分区

影响 Partitioning 的因素

  • 可用资源 - CPU cores
  • 外部数据源 - 本地的collections、Cassandra table 或 HDFS file的数量决定分区数
  • 生成RDD的Transformations操作

Block、Task 和 Partition 的关系

输入可能以多个文件的形式存储在HDFS上,每个File都包含了很多块,称为Block

当Spark读取这些文件作为输入时,会根据具体数据格式对应的InputFormat进行解析,一般是将若干个Block合并成一个输入分片,称为InputSplit,注意InputSplit不能跨越文件。

随后将为这些输入分片生成具体的Task。InputSplit与Task是一一对应的关系。

随后这些具体的Task每个都会被分配到集群上的某个节点的某个Executor去执行。

  • 每个节点可以起一个或多个Executor。
  • 每个Executor由若干core组成,每个Executor的每个core一次只能执行一个Task。
  • 每个Task执行的结果就是生成了目标RDD的一个partiton

注意:这里的core是虚拟的core而不是机器的物理CPU核,可以理解为就是Executor的一个工作线程。

而 Task被执行的并发度 = Executor数目 * 每个Executor核数。

至于partition的数目:

  • 对于数据读入阶段,例如sc.textFile,输入文件被划分为多少InputSplit就会需要多少初始Task。
  • 在Map阶段partition数目保持不变。
  • 在Reduce阶段,RDD的聚合会触发shuffle操作,聚合后的RDD的partition数目跟具体操作有关,例如repartition操作会聚合成指定分区数,还有一些算子是可配置的。

默认Partitioning操作

默认的分区操作取决于RDD是如何创建的,比如创建于外部数据源,像Cassandra table, HDFS file等,或者通过transforming从另一个RDD转化而来

API Call partitions.size partitioner
sc.parallelize(…) sc.defaultParallelism NONE
sc.cassandraTable(…) sc.defaultParallelism or data-size/64 MBs , whichever is greater NONE
sc.textFile(…) sc.defaultParallelism or number of file blocks , whichever is greater NONE
filter(),map(),flatMap(),distinct() same as parent RDD NONE except filter preserve parent RDD’s partitioner
rdd.union(otherRDD) rdd.partitions.size + otherRDD. partitions.size NONE
rdd.intersection(otherRDD) max(rdd.partitions.size, otherRDD. partitions.size) NONE
rdd.subtract(otherRDD) rdd.partitions.size NONE
rdd.cartesian(otherRDD) rdd.partitions.size * otherRDD. partitions.size NONE
reduceByKey(),foldByKey(),combineByKey(),groupByKey() same as parent RDD HashPartitioner
sortByKey() same as parent RDD RangePartitioner
mapValues(),flatMapValues() same as parent RDD parent RDD’s partitioner
cogroup(), join(), ,leftOuterJoin(), rightOuterJoin() depends upon certain input properties of two RDDs involved. HashPartitioner

多少个分区比较好?

拥有太少或太多分区数,都有明显的优缺点。因此,建议根据集群的配置和需求来谨慎选择。

分区数太少的缺点:

  • 并发少 - 没有充分地利用并发性,有些工作节点空闲出来
  • 数据倾斜和不恰当的资源利用 - 如果数据在某个分区发生倾斜,造成该工作节点的计算任务远大于其他节点,进而出现资源问题

分区数太多的缺点:

  • 任务的调度时间可能会超过实际执行时间

推荐的分区规则:

  • 一般来说分区数在100-100000之间,取决于集群的大小和数据
  • 下限 - cores的2倍
  • 上限 - 每个task的执行时间最好在100+ms,如果执行时间过短,说明分区数据量太小,程序将花费更多的时间在调度任务上

我们可以调试分区参数并在Spark Application UI上监控执行和调度时间,从而把程序调到更优

优化总结

API

  • Task被执行的并发度 = Executor数目 * 每个Executor核数

  • Task与Partition一一对应,Partition数最好为并发度的2~3倍。因此最好设置 spark.default.parallelism = executors * cores * 2

外部数据源

Spark 程序

当从HDFS读文件时, 默认的 spark.hadoop.mapred.max.split.size = 269000000(256MB)

而实际的Split size = max(mapred.min.split.size, min(mapred.max.split.size, dfs.block.size))

因此,当读取操作需要提升并行度时,可以调低 mapred.max.split.size ,比如16MB;反之亦然。

Spark Streming 程序

spark.streaming.blockInterval 这个参数用来设置Spark Streaming里Stream Receiver生成Block的时间间隔,默认为200ms。

具体的行为表现是:Receiver所接收的数据,每隔这里设定的时间间隔,就从Buffer中生成一个StreamBlock放进队列,等待进一步被存储到BlockManager中供后续计算过程使用。理论上来说,为了每个Streaming Batch 间隔里的数据是均匀的,这个时间间隔当然应该能被Batch的间隔时间长度所整除。

每个Receiver 产生的的input-block数为: batchInterval * 1000 / blockInterval

因此,建议 blockInterval = Receivers * batchInterval * 1000 / Parallelism,这样下来,每个batch产生的StreamBlock数:

StreamBlocks = Receivers * batchInterval * 1000 / blockInterval = Receivers * batchInterval * 1000 / (Receivers * batchInterval * 1000 / Parallelism) = Parallelism

优化前:spark.streaming.blockInterval 取默认值,partition和task数太多

优化后:设置spark.streaming.blockInterval = Receivers * batchInterval * 1000 / Parallelism,分区数明显变少,执行效率大大提高

results matching ""

    No results matching ""