Flink SQL 双表 JOIN 介绍与原理简析

作者:董伟柯,腾讯 CSIG 高级工程师

综述

Flink 作为流式数据处理框架的领跑者,在吞吐量、时延、准确型、容错性等方面都有优异的表现。在 API 方面,它为用户提供了较底层的 DataStream API,也推出了 Table API 和 SQL 等编程接口。特别来看,SQL 以其易用、易迁移的特点,深受广大用户的欢迎。

在常见的数据分析场景中,JOIN(关联)操作是一项很有挑战性的工作,因为它涉及到左右两个表(流)的状态匹配,对内存的压力较大;而相比恒定的批数据而言,流数据更加难以预测,例如数据可能乱序、可能晚到,甚至可能丢失,因此需要缓存的状态量更加庞大,甚至会严重拖慢整体的数据处理进度。由此可见,流的 JOIN 并没有一个全能且通用的方案,我们必须在 低时延高精准 等维度间进行取舍。

考虑到不同业务场景的时效性、准确型要求不同,Flink 提供了多种流式的 JOIN 操作,用户可以根据实际情况选择最适合自己的类型。下面我们对它们逐一进行介绍。

常规 JOIN(Regular JOIN)

常规 JOIN(Regular JOIN)是语法最简单的一类 JOIN,和传统数据库的 JOIN 语法完全一致。对于左表和右表的任何变动,都会触发实时计算和更新,因此它的结果是“逐步逼近”最终的精确值,也就是下游可能看到变来变去的结果。为了支持结果的更新,下游目的表需要 定义主键 (PRIMARY KEY NOT ENFORCED)。

常规 JOIN 支持 INNER、LEFT、RIGHT 等多种 JOIN 类型。其中 INNER JOIN 只会下发 Upsert 数据流(即只有更新和插入,没有删除操作),而 LEFT 和 RIGHT JOIN 则会下发更多类型的 Changelog 数据流(包含了插入、更新、删除等各种类型)。对于各类数据流的区别和转化,请参见 Flink 官方文档:动态表。

常规 JOIN 运行时需要保留左表和右表的状态,且随着时间的推移,状态会无限增长,最终可能导致作业 OOM 崩溃或异常缓慢。因此我们强烈建议用户在 Flink 参数中设置 table.exec.state.ttl 选项,它可以指定 JOIN 状态的保留时间,以便 Flink 及时清理过期的状态。

下面是一个使用常规 JOIN 的示例作业:

CREATE TABLE `Order`
(
  id         INT,
  product_id INT,
  quantity   INT,
  order_time TIMESTAMP,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
     'connector' = 'datagen',
     'fields.id.kind' = 'sequence',
     'fields.id.start' = '1',
     'fields.id.end' = '100000',
     'fields.product_id.min' = '1',
     'fields.product_id.max' = '100',
     'rows-per-second' = '1'
);


CREATE TABLE `Product`
(
  id    INT,
  name  VARCHAR,
  price DOUBLE,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
     'connector' = 'datagen',
     'fields.id.min' = '1',
     'fields.id.max' = '100',
     'rows-per-second' = '1'
);


CREATE TABLE `OrderDetails`
(
  id           INT,
  product_name VARCHAR,
  total_price  DOUBLE,
  order_time   TIMESTAMP,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
     'connector' = 'print'
);


INSERT INTO `OrderDetails`
SELECT o.id, p.name, o.quantity * p.price, o.order_time
FROM `Order` o
INNER JOIN
`Product` p
ON o.product_id = p.id;

我们来看一下这个 SQL 作业生成的物理计划(红框标明的是 JOIN 部分):

b2a455d8b9d49d649f8ed85981964883.png

可以看到,我们的双表 Regular JOIN 语句最终生成了 Join 算子,它从两个数据源里获取数据,且数据根据我们的 JOIN 键来进行哈希分配。

在该 Flink 作业的运行时,实际执行 JOIN 逻辑的是 org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOp

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值