
spark
文章平均质量分 78
鸿乃江边鸟
Apache Spark Contributor
专注于技术的dotaer
展开
-
Spark中排序--前缀排序prefixSort
中 内存排序(UnsafeInMemorySorter)最基本的思想:先根据前缀比较算法进行比较,如果相等的话,则再遍历实际数据的指针去获取真正的数据进行比较,这种可以规避随机内存读取从而提交缓存的命中率,进而提高比较的速度。这里特别说一下:两种类型的BinaryType(对应内部的类型为Array[Byte]) 和 StringType(对应的内部的类型为UTF8String) 获取prefix的.会根据Spark的内部类型,获取Long类型的可以用于比较的值,所以我们可以看到在。原创 2025-04-03 18:32:22 · 1072 阅读 · 0 评论 -
Spark SQL中怎么注册python以及使用python注册的UDF中数据流是怎么流转的
分享本文的目的就在于 使读者明确 怎么在Spark SQL中调用 python注册的UDF,这里的的SQL 可以不仅仅是在 python api 中调用,也可以是在 java或者scala api中调用的。,好像都没有说到在Spark SQL中怎么直接调用 python定义的UDF,但是其实在使用上,Spark SQL是可以直接使用 python定义的UDF的,可以看到 这种运行 python UDF的方式是以socket的方式进行交互的,所以这种方式相对来说还是会比较慢的。原创 2024-10-04 17:33:46 · 777 阅读 · 0 评论 -
Spark AQE 导致的 Driver OOM问题
因为原则上来说,如果没有开启AQE之前,一个SQL执行单元的是属于同一个Job的,开启了AQE之后,因为AQE的原因,一个Job被拆成了了多个Job,但是从逻辑上来说,还是属于同一个SQL处理单元的所以还是得归属到一次执行中。类在内存中存放着 一个整个SQL查询链的所有stage以及stage的指标信息,在AQE中 一个job会被拆分成很多job,甚至几百上千的job,这个时候 stageMetrics的数据就会成百上倍的被存储在内存中,从而导致。主要的作用是设置当前计划的所属的。该方法会获取事件中的。原创 2024-04-26 22:39:30 · 1528 阅读 · 3 评论 -
Spark Rebalance hint的倾斜的处理(OptimizeSkewInRebalancePartitions)
假如说hash(col)为0,那实际上只有reduceTask0有数据,其他的ReduceTask1等等都是没有数据的,所以最终只有ReduceTask0写文件,并且只有一个文件。这些值配置,如果这些配置调整的不合适,就会导致写文件的时候有可能只有一个Task在运行,那么最终就只有一个文件。的作用是对小文件进行拆分,使得罗盘的文件不会太大,这个会有个问题,如果我们在使用。的作用主要是进行文件的合并,是得文件不会太小,本文基于Spark 3.5.0。的值是固定的,比如说值永远是。原创 2024-03-21 09:05:55 · 1447 阅读 · 0 评论 -
关于Spark中OptimizeShuffleWithLocalRead 中自己的一些理解
这种情况下,在Spark的内部表示 ShuffleOrigin 为 REBALANCE_PARTITIONS_BY_NONE,这种情况下 是hint为。这里的条件默认是根据shuffle的个数来计算的,如果优化后的shuffle数有增加,则会回退到之前的物理计划中去,当然用户也可以配置。针对第二种情况,这种情况一般来说都是有正向的提升效果的,但是也会经过第一种情况的逻辑判断。规则下,有可能会增加额外的Shuffle操作,这种情况就是负优化了,所以在进行了。的作用简单的来说,就是会按照一定的规则,从一个。原创 2024-03-06 22:58:52 · 931 阅读 · 0 评论 -
Spark中读parquet文件是怎么实现的
因为对于Spark来说,任何一个事情都不是独立的存在的,比如说parquet文件的rowgroup设置的大小对读写的影响,以及parquet写之前排序对读parquet的影响,以及向量化读取等等。为‘true’(默认就是true),则会进行unsafeRow的转换,当然这里的好处就是节约内存以及能够减少GC。最近在整理了一下 spark对Parquet的写文件的过程,也是为了更好的理解和调优Spark相关的任务,这条filter,则只会拿出rowgroup的信息和rowgrups的的行数。原创 2024-03-04 20:27:22 · 1225 阅读 · 0 评论 -
Spark中写parquet文件是怎么实现的
的时候得注意不能调整过大,否则会导致OOM,但是如果在最后写文件的时候加入合并小文件的功能(AQE+Rebalance的方式),也可以适当的调整大一点,因为这个时候的Task 不像没有shuffle一样,可能还会涉及到sort以及aggregate等消耗内存的操作,(这个时候就是一个task纯写parquet文件)这三个配置项存在着相互制约的关系,总的目标就是检查当行数达到了一定的阈值以后,来检查是否能够flush到内存page中,具体的可以查看。表示的是partition名字的常量。原创 2024-02-22 22:53:08 · 1587 阅读 · 0 评论 -
Spark中多分区写文件前可以不排序么
会根据partition或者bucket作为最细粒度来作为writer的标准,如果相邻的两条记录所属不同的partition或者bucket,则会切换writer,所以说如果不根据partition或者bucket排序的话,会导致。频繁的切换,这会大大降低文件的写入速度。目前 Spark中的实现中,对于多分区的写入默认会先排序,这是没必要的。至于Spark在写入文件的时候会加上Sort,这个是跟写入的实现有关的,也就是。这两个物理计划中,最终写入文件/数据的时候,会调用到。(默认值为0),则会加上。原创 2024-02-15 22:30:11 · 1119 阅读 · 0 评论 -
Spark 中 BroadCast 导致的内存溢出(SparkFatalException)
这个问题折腾了我大约2个小时,错误发生的上下文都看了不止十遍了,还是没找到一丝头绪,可能是上帝的旨意,在离错误不到50行的地方,对于一个在大数据行业摸爬滚打了多年的老手来说,第一眼肯定是跟着堆栈信息进行排查,目前在排查 Spark 任务的时候,遇到了一个很奇怪的问题,在此记录一下。在查找错误的时候,还是得在错误的上下文中多翻几页。这个类,但是就算把代码全看一遍也不会有所发现。, 没想到是 OOM 问题。理所当然的就是会找到。原创 2024-01-08 17:57:35 · 1310 阅读 · 0 评论 -
Spark Paimon 中为什么我指定的分区没有下推
针对于错误的写法,也就是导致读取全量数据的写法,我们分析一下,首先是类型转换阶段,在Spark中,对于类型不匹配的问题,spark会用规则进行转换,具体的规则是。最近在使用 Paimon 的时候遇到了一件很有意思的事情,写的 SQL 居然读取的数据不下推,明明是分区表,但是却全量扫描了。这种情况下,对于文件的读取IO会增大,但是对于shuffle等操作是不会有性能的影响的。对于分区字段来说,我们在写SQL对分区字段进行过滤的时候,保持和分区字段类型一致。可以看到经过了规则转换 所有的过滤条件都下推到了。原创 2023-12-14 18:02:27 · 959 阅读 · 0 评论 -
Spark升级中对log4j的一些思考
最终我们只留下了log4j2 (log4j-core + log4j-api) + logback (logback-classic + logback-core) ,其他的都排除掉,web端打包加编译没有任何问题,一切还是那么的美好(毕竟花了一天时间)在 spark3.1中采用的是log4j1 (log4j + slf4j-log4j2),spark 3.5中采用的是log42(log4j-core + log4j-api + log4j-slf4j2-impl),原创 2023-11-27 23:25:11 · 1606 阅读 · 0 评论 -
Spark调优案例分享
注意是Mac Keynote。原创 2023-11-13 20:15:49 · 974 阅读 · 0 评论 -
Spark UI中Shuffle dataSize 和shuffle bytes written 指标区别
目前在做一些知识回顾的时候,发现了一些很有意思的事情,就是Spark UI中ShuffleExchangeExec 的dataSize和shuffle bytes written指标是不一样的,指的是写入文件的字节数,会区分压缩和非压缩,如果在开启了压缩(也就是spark.shuffle.compress true)和未开启压缩的情况下,该值的大小是不一样的。那么在AQE阶段的时候,是以哪个指标来作为每个Task分区大小的参考呢。的实例,这样就获取到了实际内存中的每个分区的大小,原创 2023-10-27 07:38:47 · 897 阅读 · 0 评论 -
Spark 3.4.x 对 from_json regexp_replace组合表达式慢问题的解决
该计划的差异主要部分还是在于Rule在和的差别处理。原创 2023-08-12 16:44:47 · 514 阅读 · 0 评论 -
Spark 3.1.1 遇到的 from_json regexp_replace组合表达式慢问题的解决
最主要关心的是 parser这个变量,因为由于上述规则的原因,两个schema单独在不同的parser中,而这里的 Child是由regexp_replace表达式组成的,所以该正则表达式会计算两次,这里就会解析为 Alias(GetStructField(attribute.get, i), f.name)()(主要就是调用JsonToStructs.toString的方法)进行 UnresolvedStar 的expand方法的调用。主要的原因是 Spark 3.1.x 引入的。原创 2023-08-04 20:29:50 · 877 阅读 · 0 评论 -
Spark SQLHadoopMapReduceCommitProtocol中mapreduce.fileoutputcommitter.algorithm.version选择1还是2
大概的意思因为要保证task commits的原子性,所以好的建议是remove掉v2,不推荐使用V2。所以最后得出的结论就是:V1是安全的,但是性能不好,V2有可能是不安全的,但是性能好,推荐使用V1。FileOutputCommitter.commitTask方法。也就是为了保证spark向前向后的兼容性,强行设置为。dataWriter.write和commit方法。该executeTask方法最后会调用。当然Spark官方文档也有解释。对于spark来说默认的。更多关于细节,可以参考。原创 2023-08-02 23:20:14 · 507 阅读 · 0 评论 -
Apache Hudi初探(十一)(与spark的结合)--hudi的markers机制
虽然说在Executor端写入了多个重复数据的文件,但是因为在只有一个真正的文件会被Driver认可,所以通过最终返回的被driver认可的文件和marker文件求交集就能删除掉其他废弃的文件。在写入真正文件的同时,会在 .hoodie/.temp/instantTime目录下创建maker文件,比如.hoodie/.temp/202307237055/f1.parquet.marker.CREATE,之后在task.commit的时候会把临时目录的文件真正的移到需要写入的目录下。原创 2023-07-23 10:24:06 · 330 阅读 · 0 评论 -
Apache Hudi初探(十)(与spark的结合)--hudi的Compaction操作
这步操作主要是把生成的Compaction plan序列化成字节,并保存在相应的文件中,并生成一个Compaction的Request。baseFile,partitionPath,logFiles,还有Compaction策略。该方法主要是生成一个调度Compaction的计划。执行当前Commit的compaction操作。重新运行上次失败的Compaction计划。是够是异步Compaction计划生成。中,我们没有过多的解释Spark中。的实现,在这里详细说一下。原创 2023-07-22 16:57:01 · 729 阅读 · 0 评论 -
Spark中为什么Left join比Full join 快
如果在语意允许的情况下,选择left join可以大大加速任务运行,笔者遇到的情况就是 left join 运行了。一样,唯一不一样的是SortMergeJoin 的child的outputPartitioning是。后如果不重新shuffle,会导致一个任务中会有id为null值的存在,会导致join的结果不正确。来说就不一样了,task join完后id还是保持原来的就不会变,所以就不必重新shuffle。只有在读取source文件完之后才会有Exchange的shuffle的操作。原创 2023-07-16 22:10:53 · 624 阅读 · 0 评论 -
Delta数据湖upsert调优---1000多列表的调优
最终会调用反射去获取字段,要知道反射是比较消耗时间的,要知道我们现在是有1000多个字段,如果每一行都会被反射1000次,再加上几十亿行的数据,这个计算速度肯定是比较慢的,而且为了达到更新的效果,我们还调用了。目前在我们公司遇到了一个任务写delta(主要是的。delta的MergeIntoCommand。操作,这又增加了cpu的计算(1000多次)set该schema的字段,其他的字段不变。实现的,该操作的的具体实现,可以参考。操作),写入的时间超过了。原创 2023-07-04 23:49:27 · 368 阅读 · 0 评论 -
Spark 3.4.0新特性--UI支持存储在RocksDB中
来说,目前存储所有的事件信息以及UI所需要的信息都是默认存储在内存中,这在CS中,对于以。作为存储以后,能够减少driver所需内存,并且引进新的。数据结构为InMemoryStore。能够大大加快spark事件的读写事件。Spark UI和SHS。原创 2023-07-01 15:39:00 · 1377 阅读 · 0 评论 -
Spark 3.4.x Server Client模式下的数据传输实现
中,我们提到Spark 3.4.x中是Client和Server之间的数据传输是采用。的,那具体是怎么实现的呢?这里的逻辑就是转换为。原创 2023-06-27 23:17:43 · 418 阅读 · 0 评论 -
Apache Hudi初探(一)(与flink的结合)
long ckpTimeout = * 获取到。的方式,只需要引入了对应的jar包即可,以。的方式,所以不需要像使用。的超时时间,并设置为。原创 2023-06-18 16:53:18 · 1566 阅读 · 1 评论 -
Apache Hudi初探(九)(与spark的结合)--非bulk_insert模式
来说,什么也不操作(因为该index每次都会从parquet文件中读取信息从而组装成index),构建一个状态信息,主要是记录一下插入的记录数量和更新的记录数量 其中主要形成了以。(默认是true)会进行元数据的commit操作,这些commit的操作和之前。,则会要求排序,如果没有则只是按照partitioner进行重分区,(这里暂时忽略),主要是对数据进行分区处理,设计到小文件的处理。操作,所以没有去重的需要,所以直接采用spark原生的方式,实例的构造方法中会进行一些额外的操作。原创 2023-06-10 06:17:02 · 1328 阅读 · 0 评论 -
Apache Hudi初探(八)(与spark的结合)--非bulk_insert模式
并且从*.hoodie/20230530073115535.deltacommit* 获取internalSchemaOpt,具体的合并就是把即将写入的schema和internalSchemaOpt进行合并。因为是"bulk insert"操作,所以没有去重的需要,所以直接采用spark原生的方式,把df的schema转换成avro的schema。),则会进行去重处理,具体是调用。开始写操作,这涉及到回滚的操作。,就会进行schema的合并。是没有的,所以不会开启异步的。原创 2023-06-01 07:22:07 · 816 阅读 · 0 评论 -
Spark full outer join 数据倾斜导致OOM
spark full outer join目前存在一个问题,那就是在数据倾斜的时候,会导致Execuotr OOM:具体的问题描述,可以见。原创 2023-04-22 08:18:22 · 314 阅读 · 0 评论 -
SPARK中InMemoryFileIndex文件缓存导致的REFRESH TABLE tableName问题
其中cachedLeafDirToChildrenFiles的值会在InMemoryFileIndex对象初始化的时候进行赋值,对应的方法为。那是因为,在一个jvm中,比如说是写了之后再读取,会进行。的情况下,在转换对应的逻辑计划当中,如果缓存中存在对应的表,则会复用缓存中的,具体的方法在。的错误,这种错误的原因有一种隐形的原因,那就是。在scan file的过程中,最主要涉及的是。对象,从而实现了文件的复用,从未导致问题。会缓存需要scan的文件在内存中,中 ,最主要的点是会公用同一个。原创 2023-03-12 22:22:17 · 1645 阅读 · 0 评论 -
SPARK outputDeterministicLevel的作用--任务全部重试或者部分重试
方法的时候,如果任务发生了重试,就有可能导致任务的数据不准确,那这个时候改怎么解决这个问题呢?该方法主要用于在重新提交失败的stage时候,用来判断是否需要重新计算上游的所有任务。这里如果任务Fetch失败了,根据该shuffle所对应的上游stage是不是。方法是随机分配数据到下游,这会导致一个问题,有时候如果我们用。所以根据以上分析,我们可以改写对应的RDD的。那么该变量的作用是什么呢?方法进行上游所有任务或者单个任务的重试。方法来进行stage任务的全部重试与否。方法进行循环调用上游的。原创 2023-02-20 21:54:30 · 625 阅读 · 0 评论 -
Spark 3.1.1 shuffle fetch 导致shuffle错位的问题
进行了重组(在获得streamHandle的时候内部会根据reduceIdArr构建blocks索引,下文中会说到)会导致和成员变量blockIds的顺序不一致,为什么两者不一致会导致问题呢?所以在以上两种情况下,只要有重新fetch数据的操作,就会存在数据的错位,导致数据的不准确。但是这个和createFetchShuffleBlocksMsg输出的顺序是不一致的,的索引下标,也就是下文中numBlockIds组成的数组下标,这里和上面的一样,只不过对应的方法为。这个方法的作用就是: 构建一个。原创 2023-02-15 21:42:33 · 1417 阅读 · 2 评论 -
Spark做TPC-DS性能测试
最近由于在做上云的工作,并且公司离线部分引擎是Spark,所以做了一次基于TPC-DS性能比对测试。生产了大约200GB的数据。原创 2023-01-31 22:05:23 · 3154 阅读 · 2 评论 -
SPARK中metrics是怎么传递的
在任务运行期间,利用heartbeat心跳来传递metrics在任务结束以后,利用任务结果的更新来传递metrics最终,都是通过sparkListener:SQLAppStatusListener和 AppStatusListener分别完成Spark UI状态的更新。原创 2022-12-12 23:19:40 · 1199 阅读 · 0 评论 -
Delta Lake中CDC的实现
根据以上的分析,可以知道目前的CDF只是在Delte层级做了反馈,如果说想要在Flink层达到CDC的效果,还得有个中间层,把delta里的CDF的数据给读取出来,转换Flink 内部形式的ChangelogMode CDC格式(比如说。CDF是能让表能够输出数据表变化的能力,CDC是能够捕获和识别数据的变化,并能够将变化的数据交给下游做进一步的处理。是判断过滤条件是否是在元数据层能够囊括,如果可以的话,就通过。我们来分析一下是怎么做到数据行级别的CDF的。这里列举该表的所有文件,并把所有的文件标识为。原创 2022-12-11 08:05:16 · 710 阅读 · 0 评论 -
SPARK中关于HighlyCompressedMapStatus的说明(会造成运行时的数据不精确)
这是来map端任务记录精确分区的阈值,如果大于该阈值,则会记录真实的reduce数据的分区大小,如果小于则记录的是每个reduce大小的平均值(这导致会在reduce获取运行时的数据大小信息时数据不准确的问题,从而导致AQE的效果不是很理想)。采用的是对于数据量小的reduce分区数据采用公用平均值的方式,这在一定程度上能够减缓Driver OOM的概率,中,这样下游的reduce任务如果需要获取上游MapTask的运行情况的时候就会最终调用到。因为对于小数据量的分区,只需要存储一个平均值,而不像。原创 2022-11-06 22:45:01 · 1490 阅读 · 0 评论 -
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(10)
从一个unit test来探究SPARK Codegen的逻辑,本文基于 SPARK 3.3.0。原创 2022-09-20 12:29:26 · 250 阅读 · 0 评论 -
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(9)
从一个unit test来探究SPARK Codegen的逻辑,sortAggregateExec的。本文基于 SPARK 3.3.0。updateExprs的不同。操作完后的结果buffer。其他的数据流向和之前的一样,方法,其中input为。阶段,所以更新语句是。原创 2022-09-20 12:28:41 · 904 阅读 · 0 评论 -
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(8)
constructDoConsumeFunction方法中inputVarsInFunc。从一个unit test来探究SPARK Codegen的逻辑,本文基于 SPARK 3.3.0。根据数据类型的不同,调用。原创 2022-09-19 20:44:36 · 899 阅读 · 0 评论 -
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(7)
val buffer = 和buffer.init(index, Array(iter))从一个unit test来探究SPARK Codegen的逻辑,对rdd进行迭代,对于当前的第一阶段全代码生成来说,该。代码进行编译,如果编译报错,则回退到原始的执行。代码初始化,对于当前第一阶段全代码生成来说,端进行编译,如果代码生成有误能够提前发现。(默认值),则回退到原始的执行。不会被用到,第二阶段中会把。不会被用到,因为数据是由。如果代码生成的长度大于。对于当前来说,目前只是。会被用来进行产生数据,原创 2022-09-18 22:53:45 · 547 阅读 · 0 评论 -
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(6)
从一个unit test来探究SPARK Codegen的逻辑,本文基于 SPARK 3.3.0。中我们提到在对应的函数计算完后,是已经计算处理的结果了。的UnsafeRow。原创 2022-09-17 07:38:09 · 309 阅读 · 0 评论 -
SPARK中的wholeStageCodegen全代码生成--以aggregate代码生成为例说起(5)
是在SortAggregateExec的produce方法进行赋值的,也就是对应“SUM”和“COUNT”初始值的。传递进来了,但是在这个方法中却没有用到,因为对于大部分情况来说,该变量是对外部传递InteralRow的作用。,对于AVG聚合函数来说,聚合的缓冲属性(aggBufferAttributes)为。进行公共子表达式的消除,并提前计算出在计算子表达式计算之前的自表达式。对于当前的计划来说,SortAggregateExec的。本文基于 SPARK 3.3.0。对于目前的物理计划来说,当前的。原创 2022-09-15 20:32:27 · 814 阅读 · 0 评论 -
SPARK中的wholeStageCodegen全代码生成--GenerateUnsafeProjection.createCode说明
val writeFieldsCode =以及后面的代码组装。因为inputs的类型是LONG类型,所以对应到。因为inputs为null为false,所以。对每一个变量的赋值按照换行符进行分隔。对于在在RangeExec中出现的。原创 2022-09-14 20:18:58 · 578 阅读 · 0 评论