spark full outer join目前存在一个问题,那就是在数据倾斜的时候,会导致Execuotr OOM:具体的问题描述,可以见SPARK-24985,
转述一下就是:
SortMergeJoinExec类以下代码块的处理:
doExecute
||
\/
case FullOuter =>
val leftNullRow = new GenericInternalRow(left.output.length)
val rightNullRow = new GenericInternalRow(right.output.length)
val smjScanner = new SortMergeFullOuterJoinScanner(
leftKeyGenerator = createLeftKeyGenerator(),
rightKeyGenerator = createRightKeyGenerator(),
keyOrdering,
leftIter = RowIterator.fromScala(leftIter),
rightIter = RowIterator.fromScala(rightIter),
boundCondition,
leftNullRow,
rightNullRow)
new FullOuterIterator(
smjScanner,
resultProj,
numOutputRows).toScala
其中SortMergeFullOuterJoinScanner在迭代的时候对左右两边匹配的数据保存在内存中,这个内存是没有边界的,具体的处理方法如下:
private def findMatchingRows(matchingKey: InternalRow): Unit = {
leftMatches.clear()
rightMatches.clear()
leftIndex = 0
rightIndex = 0
while (leftRowKey != null && keyOrdering.compare(leftRowKey, matchingKey) == 0) {
leftMatches += leftRow.copy()
advancedLeft()
}
while (rightRowKey != null && keyOrdering.compare(rightRowKey, matchingKey) == 0) {
rightMatches += rightRow.copy()
advancedRight()
}
if (leftMatches.size <= leftMatched.capacity) {
leftMatched.clearUntil(leftMatches.size)
} else {
leftMatched = new BitSet(leftMatches.size)
}
if (rightMatches.size <= rightMatched.capacity) {
rightMatched.clearUntil(rightMatches.size)
} else {
rightMatched = new BitSet(rightMatches.size)
}
}