SPARK的计算向量化-spark本身的向量化

背景

我们知道,随着计算引擎战争的结束(SPARK赢得了离线处理的霸权),越来越多的公司致力于性能的优化,而引擎的优化,目前直指计算的向量化,
这片文章来说说spark本身对于向量化的实现。

spark本身的优化

我们都知道spark的Tungsten项目,这个项目中有一点就是Code Generation(代码生成)。代码生成除了消除虚函数的调用等功能外,其实在向量化这块也是做了处理的。
直接跳到ColumnarToRowExec代码:

val columnarBatchClz = classOf[ColumnarBatch].getName
    val batch = ctx.addMutableState(columnarBatchClz, "batch")
  ...
 val localIdx = ctx.freshName("localIdx")
    val localEnd = ctx.freshName("localEnd")
    val numRows = ctx.freshName("numRows")
    val shouldStop = if (parent.needStopCheck) {
      s"if (shouldStop()) { $idx = $rowidx + 1; return; }"
    } else {
      "// shouldStop check is eliminated"
    }
    s"""
       |if ($batch == null) {
       |  $nextBatchFuncName();
       |}
       |while ($limitNotReachedCond $batch != null) {
       |  int $numRows = $batch.numRows();
       |  int $localEnd = $numRows - $idx;
       |  for (int $localIdx = 0; $localIdx < $localEnd; $localIdx++) {
       |    int $rowidx = $idx + $localIdx;
       |    ${consume(ctx, columnsBatchInput).trim}
       |    $shouldStop
       |  }
       |  $idx = $numRows;
       |  $batch = null;
       |  $nextBatchFuncName();
       |}
     """.stripMargin

spark中向量化的核心就在于这块代码中,这块代码主要的就是ColumnarBatch,也就是列批,这种列批的数据结构,用FOR循环这种方式进行数据的访问,
这在JIT中会进行优化(优化成向量化)。
而这里还有一个重点就是:Parquet或者ORC这种列式存储,读取出来的时候,天然就是一个列批的数据结构,很方便做向量化操作。

但是,利用JIT进行向量化是有缺点的:
利用了JIT进行优化,这个是需要编译器追踪循环的次数的,如果循环次数不够,就不会进行进行JIT,也就无法做到向量化。
所以好多公司把这种着力于用其他语句实现来进行真正意义上的向量化。

参考

本文参考了

  1. 深度解读|Spark 中 CodeGen 与向量化技术的研究
  2. Velox: 现代化的向量化执行引擎
### Spark 向量化操作的功能及作用 #### 1. **向量化操作的核心概念** Spark向量化操作是一种优化机制,旨在通过批量处理数据的方式提高计算效率。它利用现代 CPU 架构中的 SIMD(Single Instruction, Multiple Data)特性[^3],从而显著提升性能。 #### 2. **向量化执行的具体方法** 向量化执行的关键在于批处理列式存储的数据结构。以下是其主要组成部分: - **Batched Columnar Data Layout**: 数据以列的形式存储并分批次加载到内存中。这种布局方式减少了随机访问开销,并提高了缓存命中率。 - **Filters and Conditionals**: 过滤器和条件表达式的评估被矢量化,允许一次处理多个记录而不是逐条处理。 - **Vectorized Hash Table**: 使用矢量化的哈希表加速连接和其他聚合操作的执行速度。 - **Vector Memory Management**: 高效管理内存分配和释放过程,减少垃圾回收的压力。 #### 3. **Parquet 文件的向量化读取** 在 Spark 中,`VectorizedParquetRecordReader` 是 Parquet 文件向量化读取的核心类。它的设计基于继承链 `VectorizedParquetRecordReader -> SpecificParquetRecordReaderBase -> RecordReader`,并通过增加 `initBatch`, `nextBatch`, 和 `resultBatch` 方法实现了高效的批处理逻辑[^2]。 #### 4. **实际应用案例** 当使用 Scala、Java 或 Python 开发机器学习模型时,可以结合各种库完成特征工程任务。例如,Python 用户可以选择 scikit-learn 来预处理数据;而 Scala 用户则可能更倾向于 Breeze 提供的强大线性代数能力[^1]。无论采用哪种编程语言,最终都会依赖于底层的向量化引擎来高效运行算法。 #### 5. **优势总结** - 性能提升:相比传统行式处理模式,向量化能够充分利用硬件资源,带来数量级上的提速效果。 - 资源节约:由于减少了不必要的中间状态保存动作,整体作业所需的内存空间也有所下降。 - 易用性强:开发者无需关心复杂的内部实现细节即可享受高性能带来的便利。 ```python from pyspark.sql import SparkSession spark = SparkSession.builder.appName("vectorization_example").getOrCreate() # 创建一个简单的 DataFrame 并启用向量化选项 df = spark.range(0, 100).repartition(8) # 执行某些操作,默认情况下会自动运用向量化技术 result_df = df.selectExpr("(id * id) as squared_id") result_df.show() ```
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值