- 博客(28)
- 收藏
- 关注
原创 具体案例的Flink反压问题分析与解决方案
以下是结合具体案例的Flink反压问题分析与解决方案,涵盖定位、优化和验证全流程:案例背景某电商实时推荐系统从Kafka读取用户行为数据(每秒10万条),经过用户画像计算(CPU密集型)和协同过滤推荐(复杂模型推理)后,将结果写入Elasticsearch。在高并发场景下,出现以下现象:Flink Web UI显示用户画像计算算子反压比例达80%;TaskManager日志频繁报错Buffer pool exhausted;Elasticsearch写入延迟从200ms激增至10秒。
2025-05-22 10:58:37
531
原创 处理Flink反压问题
• 升级资源配置:增加TaskManager实例或调整TaskManager内存(taskmanager.memory.process.size)。• 查看Flink日志中的反压警告信息,结合任务处理时间、吞吐量等指标,判断是否由算子逻辑复杂或外部系统延迟引起。• 调整Checkpoint策略:减少Checkpoint频率或启用增量Checkpoint,降低对处理流的干扰。• 调整资源分配:为高负载算子分配更多CPU/内存,或启用Flink Reactive Mode动态调整资源。
2025-05-22 10:53:55
374
原创 flink 调优手段
• RocksDB vs Gemini:优先使用Gemini状态后端(阿里云VVR内置),其内存访问性能优于RocksDB,减少磁盘I/O。• 堆外内存分配:调整taskmanager.memory.managed.fraction(默认0.4),增加托管内存以提升状态性能。• 减少磁盘读取:增加托管内存比例(taskmanager.memory.managed.fraction),提升状态命中率。• 状态TTL:通过setTTL为状态设置生命周期,自动清理过期数据,避免状态膨胀。
2025-05-22 10:50:29
349
原创 Hive表动态分区和静态分区
单任务最大分区数。• 避免小文件:通过 hive.merge.size.per.task 合并小文件,减少HDFS压力。• 最佳实践:结合两者优势,对固定维度使用静态分区,动态维度使用动态分区,并通过参数调优提升性能。• 混合分区策略:对固定维度(如年份)使用静态分区,动态维度(如月份)使用动态分区。• 动态分区:适合数据量大、分区值动态变化的场景,灵活性高但需关注小文件问题。• 小规模数据:数据量小且分区值明确,避免动态分区的额外开销。• 特点:分区值动态生成,适用于数据量大且分区值未知的场景。
2025-05-22 10:48:58
355
原创 Hive分桶(Bucketing)和分区(Partitioning) 区别
◦ 存储结构:每个分区对应HDFS中的一个文件夹(如 year=2024/month=05/)。◦ 存储结构:桶对应逻辑上的文件(如 000001_0),数据通过哈希取模分配。◦ 场景:数据有明显分类维度(如时间、地域),且查询常基于这些维度过滤。◦ 定义:按指定列(如日期、地域)的离散值将数据物理分割到不同目录中。◦ 定义:按指定列的哈希值将数据均匀分布到固定数量的桶(文件)中。◦ 示例:按用户ID分桶,JOIN时仅需处理相同桶的数据。◦ 优点:避免全表扫描,适合高基数列(如日期)的过滤。
2025-05-22 10:46:26
282
原创 spark rdd的5大特性
◦ 支持多级存储策略(如MEMORY_ONLY、MEMORY_AND_DISK_SER),平衡内存与CPU开销。◦ 提升并行计算效率,例如1GB数据可分成100个分区(每个10MB),由不同节点处理。◦ 转换操作(如flatMap)生成新RDD,行动操作(如count)触发实际计算。• 定义:RDD的数据被划分为多个逻辑分区,分布在集群的不同节点上并行处理。◦ 容错机制:数据丢失时,通过血统重新计算恢复(如窄依赖仅需重算父分区)。• 定义:RDD记录其生成过程(依赖关系),形成有向无环图(DAG)。
2025-05-22 10:42:02
388
原创 读取kafka的偏移量的工具类
读取kafka的偏移量的工具类// An highlighted blockimport java.util.Propertiesimport org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.TopicPartitionimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.sp
2021-10-11 18:50:08
294
原创 Maxwell 的使用
Maxwell 的使用1.Maxwell介绍Maxwell介绍Maxwell是由美国zendesk开源,用java编写的Mysql实时抓取软件,其抓取的原理也是基于binlog。2.Maxwell 和canal工具对比➢Maxwell没有canal那种server+client模式,只有一个server把数据发送到消息队列或redis。如果需要多个实例,通过指定不同配置文件启动多个进程。➢Maxwell有一个亮点功能,就是canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell
2021-10-11 18:27:00
1002
原创 flink 读取kafka数据
下面展示一些 内联代码片。package io.github.interestinglab.waterdrop.flink.testimport java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.
2021-06-08 16:43:21
283
原创 flink 将数据写入到kafka
下面展示一些 内联代码片。package io.github.flink.test/** * Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的 * 输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。 * stream.addSink(new MySink(xxxx)) */import org.apache.flink.api.common.serialization.SimpleStringSchemai
2021-06-08 16:34:35
724
原创 多线程爬取携程网酒店数据,星级数据和经纬度
下面展示一些 内联代码片。多线程爬取携程网酒店星级数据和经纬度import threadingfrom queue import Queueimport configimport urllibimport reimport pandas as pdnum_of_threads=10def write_fun(line): with open('酒店0502.csv','a',encoding='utf-8') as f: f.write(line)
2021-06-08 16:22:39
1162
原创 flink source 写入mysql或者jdbc
// A code blockvar foo = 'bar';package io.github.flink.testimport java.util.Randomimport org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}import org.apache.flink.streaming.api.functions.source.SourceFunctionimport org.
2021-06-08 15:45:40
298
原创 flink 链接source 将数据写入Elasticsearch
maven 依赖 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.9.2</version> </dependency>
2021-06-08 11:47:08
318
原创 flink KeyedProcessFunction 源码分析
KeyedProcessFunction 继承 AbstractRichFunction 函数Function为基础函数接口,为纯接口里面没有方法RichFunction 继承基础接口Function里面封装了到了运行环境函数,open和close 方法还有得到迭代运行时间内容public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction { private static fi.
2021-06-04 17:11:02
273
1
原创 flink BoundedOutOfOrdernessTimestampExtractor
BoundedOutOfOrdernessTimestampExtractor 实现了AssignerWithPeriodicWatermarks (注册周期生成Watermarks)源码如下第一次注册会生成当前最大时间戳currentMaxTimestamp =lastEmittedWatermark - 时间间隔(maxOutOfOrderness)public abstract class BoundedOutOfOrdernessTimestampExtractor<T>
2021-06-03 17:57:44
1380
1
原创 Caused by: java.lang.NoSuchMethodError: org.elasticsearch.action.DocWriteRequest.parent()Ljava/lang/
// 创建index request,准备发送数据val indexRequest = Requests.indexRequest().index(“wtte”).type(“data”).source(json)flink 写入elasticsearch 报错 Caused by: java.lang.NoSuchMethodError: org.elasticsearch.action.DocWriteRequest.parent()Ljava/lang/解决 添加maven 依赖 &l
2021-06-02 15:15:58
446
原创 ES常用命令
#查看集群健康Get /_cat/health/?vGet /_cat/health?v#查看节点情况GET /_cat/nodes/?v#查询各个索引状态GET /_cat/indices?v#创建一个索引PUT /movie_indexPUT /sensor#删除一个索引DELETE /movie_index#查看某一个索引的分片情况GET /_cat/shards/movie_index?vGET /_cat/shards/sensor?v#创建文.
2021-06-02 14:38:32
299
原创 Caused by: redis.clients.jedis.exceptions.JedisDataException: NOAUTH Authentication required
flink 链接redis报错Caused by: redis.clients.jedis.exceptions.JedisDataException: NOAUTH Authentication required原因是没有权限(NOAUTH Authentication required),解决办法就是加上val conf = new FlinkJedisPoolConfig.Builder().setHost(“127.0.0.1”).setPort(6379).setPassword(’’
2021-06-02 11:42:35
929
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.typeinfo.TypeInformation 错误
Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/flink/api/common/typeinfo/TypeInformationat io.github.interestinglab.waterdrop.flink.test.RedisSinkTest.main(RedisSinkTest.scala)Caused by: java.lang.ClassNotFoundException: org.apache
2021-06-02 11:22:56
821
原创 Flink入门参数设置(二)
// An highlighted blockpublic class ConfigKeyName { public final static String TIME_CHARACTERISTIC = "execution.time-characteristic"; public final static String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout"; public final static String PAR
2021-05-27 14:26:55
371
3
原创 Flink 入门和环境创建以及各种参数含义(一)
// An highlighted blockimport org.apache.flink.api.common.time.Time;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;import org.apache.flink.runtime.state.StateBackend;import o..
2021-05-27 11:43:35
216
原创 获取马蜂窝景点数据和景点评论数据
CREATE TABLE `poi` ( `poi_id` int NOT NULL, `name` varchar(128) DEFAULT NULL, `image` varchar(512) DEFAULT NULL, `link` varchar(512) DEFAULT NULL, `lat` float DEFAULT NULL, `lng` float DEFAULT NULL, `type` int DEFAULT NULL, `is_cnmain` int.
2021-05-25 13:24:40
1519
3
原创 获取搜狗指数数据
// An highlighted blockimport requests, re ,jsonfrom bs4 import BeautifulSoupimport randomimport jsonimport timeuser_agent_list = [ 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.132 Saf
2021-05-25 11:39:16
237
原创 获取www.maigoo.com 景点爬虫,包括1A到5A级景点名称。景点地址,景点描述和景点经纬度
下面展示一些 内联代码片。// An highlighted blockimport requests, re ,jsonfrom bs4 import BeautifulSoupimport randomimport jsonimport timeuser_agent_list = [ 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.
2021-05-25 09:51:37
1071
原创 Spark read.json 用法
// An highlighted block val otherPeopleDataset = ssc.createDataset( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val otherPeople = ssc.read.json(otherPeopleDataset) otherPeople.show() val schema = ssc.re
2021-05-25 09:34:10
1523
原创 spark解析json用到的算子
// A code block// An highlighted block import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ //定义字段类型 val jsonSchema = new StructType().add("battery_level", LongType).add("c02_level",LongType).add("cca3",StringType).add
2021-05-24 18:25:42
168
原创 spark 报错 No implicits found for parameter evidence$4: Encoder
spark 报错 No implicits found for parameter evidence$4: Encoder解决添加 import ssc.implicits._// An highlighted block//No implicits found for parameter evidence$4: Encoderimport scala.collection.JavaConversions._import org.apache.spark.sql.SparkSessionimpo
2021-05-24 15:06:41
664
5
空空如也
空空如也
TA创建的收藏夹 TA关注的收藏夹
TA关注的人