Spark中排序--前缀排序prefixSort

背景

最近偶然间看到了一篇文章一文掌握 Velox orderby 算子的排序算法,里面主要说到了Velox PrefixSort怎么用排序算饭加速大数据的排序的,其中有说到:

排序的过程,主要考虑霎三件事情:
1. 选择比较函数
2. 选择排序算法
3. 排序过程中的数据移动,移动数据或者移动指针?

最让我深有感触的是 这里面涉及到的比较函数,这里的主要思想就是:

把所有的类型的比较(无论是字符串还是整数等),都转换为二进制字符串的比较,那么这在比较的速度上就会能够充分利用硬件资源,使得加速。其中就会涉及到各个字段类型的规范化: 按照order by的顺序依次进行规范,如果遇到不能规范的字段类型,则后续的规范直接中断。

当然具体的Velox的代码我是没有去看,但是我们可以解析一下Spark中的Sort是怎么实现的,作为大数据的标杆组件,我们可以看一下,本文基于Spark 3.5

分析

直接切入到SortExec类,其中有个createSorter 方法,这里会构建排序函数,我们这里的重点不是排序函数,而是比较函数的实现:

    val ordering = RowOrdering.create(sortOrder, output)

    // The comparator for comparing prefix
    val boundSortExpression = BindReferences.bindReference(sortOrder.head, output)
    val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)

    val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&
      SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)

    // The generator for prefix
    val prefixExpr = SortPrefix(boundSortExpression)
    val prefixProjection = UnsafeProjection.create(Seq(prefixExpr))
    val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer {
      private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix
      override def computePrefix(row: InternalRow):
          UnsafeExternalRowSorter.PrefixComputer.Prefix = {
        val prefix = prefixProjection.apply(row)
        result.isNull = prefix.isNullAt(0)
        result.value = if (result.isNull) prefixExpr.nullValue else prefix.getLong(0)
        result
      }
    }

    val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
    rowSorter = UnsafeExternalRowSorter.create(
      schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort)


先说 UnsafeExternalRowSorter中 内存排序(UnsafeInMemorySorter)最基本的思想:先根据前缀比较算法进行比较,如果相等的话,则再遍历实际数据的指针去获取真正的数据进行比较,这种可以规避随机内存读取从而提交缓存的命中率,进而提高比较的速度。

再说 这里自定义的前缀比较:

  • BindReferences.bindReference(sortOrder.head, output) 这里指选择第一个排序的字段作为前缀比较的类型

  • val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)
    这里会根据排序的字段类型选择出对应的排序方法:

        sortOrder.dataType match {
        case StringType => stringPrefixComparator(sortOrder)
        case BinaryType => binaryPrefixComparator(sortOrder)
        case BooleanType | ByteType | ShortType | IntegerType | LongType | DateType | TimestampType |
            TimestampNTZType | _: AnsiIntervalType =>
          longPrefixComparator(sortOrder)
        case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS =>
          longPrefixComparator(sortOrder)
        case FloatType | DoubleType => doublePrefixComparator(sortOrder)
        case dt: DecimalType => doublePrefixComparator(sortOrder)
        case _ => NoOpPrefixComparator
      }
    
    

    最后 就只有两种前缀比较器 UnsignedPrefixComparator SignedPrefixComparator NoOpPrefixComparator
    对于 String 以及Binary double float 这种会选择无符号的前缀比较
    对于 double等基本数据类型会选择 有符号的前缀比较
    这里为什么会这么选择,其实是跟内部的类型存储有关以及 prefixExpr 和 prefixComputer选择的Prefix有关

  • 计算前缀
    主要涉及以下

      val prefixExpr = SortPrefix(boundSortExpression)
      val prefixProjection = UnsafeProjection.create(Seq(prefixExpr))
      val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer 
    

    这里主要利用代码生成的方式,通过prefixProjection.apply(row) 这只拿了第一个sortOrder的表达式,所以是以第一个sort表达式来获取前缀比较的。
    其中 SortPrefix中的方法calcPrefix会根据Spark的内部类型,获取Long类型的可以用于比较的值,所以我们可以看到在prefixComputercomputePrefix方法中可以通过getLong(0)来获取对应的值。这样在后续内存排序(UnsafeInMemorySorter)中就可以用该long值进行排序。

其他

这里特别说一下:两种类型的BinaryType(对应内部的类型为Array[Byte]) 和 StringType(对应的内部的类型为UTF8String) 获取prefix的.
注意UTF8String 内部也是以 Array[Byte]存储的.
这两个都是通过ByteArray.getPrefix方法来获取对应的值的。
其中 Platform.BYTE_ARRAY_OFFSET调用UNSAFE.arrayBaseOffset(byte[].class) 获取数组第一个元素相对于数组起始地址的偏移量.

  public static long getPrefix(byte[] bytes) {
    if (bytes == null) {
      return 0L;
    }
    return getPrefix(bytes, Platform.BYTE_ARRAY_OFFSET, bytes.length);
  }

  static long getPrefix(Object base, long offset, int numBytes) {

getPrefix 这个方法从字节数组取numBytes 个字节数之后组成Long类型返回。

其实byte的内部的底层也是按照数字存储的,取值范围是[-128,127],所以在底层转换为long也是可以的。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值