大数据技术的sparkcore(6)ranger分区原理-betway88必威官网_betway体育官网,必威app|欢迎进入

声明:本章节摘自别人文章,链接为https://blog.csdn.net/dmy1115143060/article/det游戏大厅ails/8262071iphone6splus5,尊重别人权益便是尊重自己。


1facetime、RangePartitioner原理简介

Spark引进RangePartitioner的意图是为了处理HashPartitioner所带来的分区歪斜问题,也即分区中包括的数据量不均衡问题。HashPartitioner选用哈希的办法将同一类型的Key分配到同一个Partition中,因而当某一或某几种类型数据量较多时,就会构成若干Partition中包括的数据过大问题,而在Job履行进程中,一个Pa挠脚心视频rtition对应一个Task,此刻就会使得某几个Task运转过慢。RangePartitioner根据抽样的思想来对数据进行分区。下图简略描绘了RangePartitioner的数据分区进程。

2、RangePartitioner源码详解

① 确认采样数据的规划:RangePartitioner默许对生情中情成的子RDD中的每个Partition收集20条数据,样本数据最大为1e6条。

// 一共需求收集大数据技能的sparkcore(6)ranger分区原理-betway88必威官网_betway体育官网,必威app|欢迎进入的样本数据个数,其间p陈某菠artitions代表终究子RDD中包括的Partition个数
val sampleSize = math.min(20.0 * partitions, 1e6)

② 确认父RDD中每个Partition中应当收集的数据量:这儿留意的是,对父RDD中颜表立是什么意思每个Partition收集的数据量会在均匀值上乘以3,这儿是为了后继在进行判别一个Partition是否发生了歪斜,当一个Partition包括的数据量超过了均匀值的三倍,此刻会以为该Parti康王tion发生了数据歪斜,会对该Partition调用sample算子进行从头采样。

// 被采样的RDD中每个partition应该被收集的数据,这儿将均匀收集每个partition中数据的3倍

val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt

③ 调用sketch办法进行数据采样:sketch办法回来的成果为<采样rdd的数据量>>。在sketch办法中会运用水塘抽样算法对待采样的各个分区进行数据采样,这儿选用水塘抽样算法是因为完成无法知道每个Partition中包括的数据量,而水塘抽样算法能够确保在不知道全体的数据量下依然能够等概率地抽取出每条数据。图4简略描绘了水塘抽样进程。

// 运用sketch办法进行数据抽样
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)

/**
* @param rdd 需求收集数据的RDD
* @大数据技能的sparkcore(6)ranger分区原理-betway88必威官网_betway体育官网,必威app|欢迎进入param sampleSizePerPartition 每个partition收集的数据量
* @retu绅士之家rn <采样rdd数据总量>>
*/
def sketch[K : ClassTag](
rdd: RDD[K],
sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
val seed = byteswap32(idx ^ (shift << 16))
// 运用水塘抽样算法进行抽样,抽样成果是个二元组
val (sample, n) = SamplingUtils.reservoirSampleAndCount(
iter, sampleSizePerPartition, seed)
Iterator((i保卫咱们的工作怎么做dx, n, sample))
}.collect()
val numItems = sketched.map(_._2).sum
(numItems, sketched)
}

④ 数据抽样完成后,需求对不均衡的Partition从头进行抽样,默许当Partition中包括的数据量大于均匀值的三倍时,该Partition是不均衡的。当采样完成后,运用样本容量和RDD中包括的数据总量,能够得到全体的一个数据采样率fraction。运用此采样率对不均衡的Partition调用sample算子从头进行抽样。

// 核算数据采样率
val fraction = math.min(sampleSize / math.max(numItems, 1L), 1.0)
// 寄存采样Key以及采样权重
val candidates = ArrayBuffer.empty[(K, Float)]
// 寄存不均衡的Partition
val imbalancedP欧美3dartitions = mutable.Set.empty[Int]
//(idx, n, sample)=> (partition id, 当时分区数据个数,当时partition的采样数据)
sketched.foreach { case (i大数据技能的sparkcore(6)ranger分区原理-betway88必威官网_betway体育官网,必威app|欢迎进入dx, n, sample) =>
// 当一个分区中的数据量大于均匀分区数据量的3倍时,以为该分区是歪斜的
if (fraction * n > sampleSiz大数据技能的sparkcore(6)ranger分区原理-betway88必威官网_betway体育官网,必威app|欢迎进入ePerPartition) {
imbalancedPartitions 光奶奶+= idx
}
// 在三倍之内的以为没有发生数据歪斜
else {
// 每条数据的采样距离 = 1/采样率 = 1/(sample.size/n.toDouble) = n.藤壶toDouble/sample.size
val weight = (n.toDouble / sample.length).toFloat
// 对当时分区中的采样数据,对每个key构成一个二元组
for (key <- sample) {
candidates += ((key, weight))
}
}
}
// 关于非均衡的partition,从头选用sample算子进行抽样
if (imbalancedPartitions.nonEmpty) {
val imbalanced = new PartitionPruningRDD(rdd.map(_._1), imbalancedPartitions.contains)
val seed = byteswap32(-rdd.id - 1)
val reSampled = imbalanced.sample(withReplacement = false, fraction, seed).collect()
val weight = (1.0 / fraction).toFloat
candidates大数据技能的sparkcore(6)ranger分区原理-betway88必威官网_betway体育官网,必威app|欢迎进入 ++= reSampled.map(x => (x, weight))
}

⑤ 确认各个Partition的Key规模:运用determineBounds办法来确认每个Partition中包括的Key规模,先对采样的Key进行排序,然后核算每个Parti正定tion均匀包括的Key权重,然后选用均匀分配原则来确认各个Partition包括的Key规模。如当时采样Key以及权重为:<1, 0.2>, <2, 0.1>, <3, 0.1>, <4, 0.3>, <5, 0.1>, <6, 0.3>,现在将其分配到3个Partition中,则每个Part聂海芬终究处理成果ition的均匀权重为:(0.2 + 0.1 + 0.1 + 0.3 + 0.1 + 0.3) / 3 = 0.36。此刻Partition1 ~ 3分配的Key以及总权重为

/**
* @param candidates 未按采样距离排序的抽样数据
* @param partitions 终究生成的RDD包括的分区个数
* @return 分区鸿沟
*/
def determineBounds[K : Ordering朕的小猫妃 : ClassTag](
candidates: ArrayBuffer[(K, Float)],
partitions: Int): Array[K] = {
val ordering = implicitly[Ordering[K]]
// 对样本依照key进行排序
val ordered = candidates.sortBy(_._1)
// 抽取的样本容量
val numCandidates = ordered.size
// 抽取臧健和的样本对应的采样距离之和
val sumWeights = ordered.map(_._2.toDouble).sum
// 均匀每个分区的步长
val step = sumWeights / partitions
var cumWeight = 0.0
var target = step
// 分区鸿沟值
val bounds = ArrayBuffer.empty[K]
var i = 0
var j = 0武松打虎
var previousBound = Option.empty[K]
while ((i < numCandidates) && (j < partitions - 1)) {
val (key, weight) = ord熟地的成效与效果ered(i)
cumWeight += weight
// 当时的采样距离小于target,持续迭代,也即这些key应该放在同一个partition中
if (cumWeight >= target) {
// Skip duplicate values.
if (previousBound.isEmpty || ordering.gt(key, previousBound.get)) {
bounds += key
target += step
j += 1
previousBound = Some(key)
}
}
i += 1
}
bounds.toArray
}

⑥ 核算每个Key地点Partition:当分区规模长度在128以内,运用顺序查找来确认Key地点的Partition,不然运用二分查找算法来确认Key地点的Partition。

/**
* 取得每个Key地点的partitionId
*/
def getPartition(key: Any): Int = {
val k = key.asInstanceOf[K]
var partition = 0
// 假如得到的规模不大于128,则进行顺序查找
if (rangeBounds.length <= 128) {
// If we have less than 128 partitions naive search
while (partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {
partition += 大数据技能的sparkcore(6)ranger分区原理-betway88必威官网_betway体育官网,必威app|欢迎进入1
}
}
// 规模大于128,则进行二分查找该key地点规模,即可得到该key地点的partitionId
else {
// Determine which binary search method to u大数据技能的sparkcore(6)ranger分区原理-betway88必威官网_betway体育官网,必威app|欢迎进入se only once.
partition = binarySearch(rangeBounds, k)
// binarySearch either returns the match location or -[insertion point]-1
if (partition < 0) {
partition = -partition-1
}
if (partition > rangeBounds.length) {
partition = rangeBounds.length
}
}
if (ascending) {
partition
} else {
rangeBounds.length - partition
}
}

原文链接:https://blog.csdn.net/dmy1115143060/article/details/82620715

(本文为系列文章,重视作者阅览其它部分内容,总有一篇是你短缺的,技能无止境,且学且爱惜!!!)

评论(0)