作者:董伟柯,腾讯 CSIG 高级工程师
综述
Flink 作为流式数据处理框架的领跑者,在吞吐量、时延、准确型、容错性等方面都有优异的表现。在 API 方面,它为用户提供了较底层的 DataStream API,也推出了 Table API 和 SQL 等编程接口。特别来看,SQL 以其易用、易迁移的特点,深受广大用户的欢迎。
在常见的数据分析场景中,JOIN(关联)操作是一项很有挑战性的工作,因为它涉及到左右两个表(流)的状态匹配,对内存的压力较大;而相比恒定的批数据而言,流数据更加难以预测,例如数据可能乱序、可能晚到,甚至可能丢失,因此需要缓存的状态量更加庞大,甚至会严重拖慢整体的数据处理进度。由此可见,流的 JOIN 并没有一个全能且通用的方案,我们必须在 低时延 和 高精准 等维度间进行取舍。
考虑到不同业务场景的时效性、准确型要求不同,Flink 提供了多种流式的 JOIN 操作,用户可以根据实际情况选择最适合自己的类型。下面我们对它们逐一进行介绍。
常规 JOIN(Regular JOIN)
常规 JOIN(Regular JOIN)是语法最简单的一类 JOIN,和传统数据库的 JOIN 语法完全一致。对于左表和右表的任何变动,都会触发实时计算和更新,因此它的结果是“逐步逼近”最终的精确值,也就是下游可能看到变来变去的结果。为了支持结果的更新,下游目的表需要 定义主键 (PRIMARY KEY NOT ENFORCED)。
常规 JOIN 支持 INNER、LEFT、RIGHT 等多种 JOIN 类型。其中 INNER JOIN 只会下发 Upsert 数据流(即只有更新和插入,没有删除操作),而 LEFT 和 RIGHT JOIN 则会下发更多类型的 Changelog 数据流(包含了插入、更新、删除等各种类型)。对于各类数据流的区别和转化,请参见 Flink 官方文档:动态表。
常规 JOIN 运行时需要保留左表和右表的状态,且随着时间的推移,状态会无限增长,最终可能导致作业 OOM 崩溃或异常缓慢。因此我们强烈建议用户在 Flink 参数中设置 table.exec.state.ttl
选项,它可以指定 JOIN 状态的保留时间,以便 Flink 及时清理过期的状态。
下面是一个使用常规 JOIN 的示例作业:
CREATE TABLE `Order`
(
id INT,
product_id INT,
quantity INT,
order_time TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '100000',
'fields.product_id.min' = '1',
'fields.product_id.max' = '100',
'rows-per-second' = '1'
);
CREATE TABLE `Product`
(
id INT,
name VARCHAR,
price DOUBLE,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'datagen',
'fields.id.min' = '1',
'fields.id.max' = '100',
'rows-per-second' = '1'
);
CREATE TABLE `OrderDetails`
(
id INT,
product_name VARCHAR,
total_price DOUBLE,
order_time TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'print'
);
INSERT INTO `OrderDetails`
SELECT o.id, p.name, o.quantity * p.price, o.order_time
FROM `Order` o
INNER JOIN
`Product` p
ON o.product_id = p.id;
我们来看一下这个 SQL 作业生成的物理计划(红框标明的是 JOIN 部分):
可以看到,我们的双表 Regular JOIN 语句最终生成了 Join 算子,它从两个数据源里获取数据,且数据根据我们的 JOIN 键来进行哈希分配。
在该 Flink 作业的运行时,实际执行 JOIN 逻辑的是 org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOp