很多开发者包括本人也一直以为Flink SQL任务中设置'execution.checkpointing.mode'='EXACTLY_ONCE',就能实现消息的端到端的Exactly Once处理。设置完参数后,鲜有开发者去验证运行的flink job的消息传递语义是否符合预期。接下来通过一个例子进行说明。
需求
公司有个Flink SQL实时任务,Source是mysql cdc表,经过简单计算后Sink到kafka表,这是使用的connector是upsert-kafka。
需求比较简单,读取订单表中商品数量、商品单价,计算总金额,再发送到Kafka。如下图所示
代码实现
最初代码
SET 'execution.checkpointing.mode'='EXACTLY_ONCE';
SET 'execution.checkpointing.interval'='60000';
SET 'pipeline.operator-chaining.enabled'='false';
DROP TABLE IF EXISTS kfk_order;
CREATE TABLE IF NOT EXISTS kfk_order(
`id` INT NOT NULL,
`prod_name` STRING,
`qty` INT,
`price` DOUBLE,
`amount` DOUBLE,
`created_time` TIMESTAMP,
`updated_tme` TIMESTAMP,
PRIMARY KEY ( `id` ) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'order',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json'
);
DROP TABLE IF EXISTS t_order;
CREATE TABLE IF NOT EXISTS t_order (
`id` INT NOT NULL,
`prod_name` STRING,
`qty` INT,
`price` DOUBLE,
`created_time` TIMESTAMP,
`updated_tme` TIMESTAMP,
PRIMARY KEY ( `id` ) NOT ENFORCED
) WITH (
'server-id' = '1000',
'server-time-zone' = 'Asia/Shanghai',
'scan.incremental.snapshot.enabled' = 'false',
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'flink-demo',
'table-name' = 't_order'
);
INSERT INTO kfk_order
SELECT `id`
, `prod_name`
, `qty`
, `price`
, `qty` * `price` AS amont
, `created_time`
, `updated_tme`
FROM t_order;
Flink UI
Flink任务提交后,在t_order表添加记录,即刻在kafka收到了结果消息,现象不符合我理解的Flink Exactly Once预期。
Exactly Once应该在checkpoint结束后才将结果数据commit到kafka。我们设置的checkpoint间隔是1分钟,但是在kafka妙级接收到消息, 并未做到端到端的Exactly Once。
通过查阅官网和资料,原来Flink的Exactly Once只能保证Flink任务内部算子,kafka的sink是At Leatest Once,所以才出现上文描述的情况。
调整后的代码
SET 'execution.checkpointing.mode'='EXACTLY_ONCE';
SET 'execution.checkpointing.interval'='60000';
SET 'pipeline.operator-chaining.enabled'='false';
DROP TABLE IF EXISTS kfk_order;
CREATE TABLE IF NOT EXISTS kfk_order(
`id` INT NOT NULL,
`prod_name` STRING,
`qty` INT,
`price` DOUBLE,
`amount` DOUBLE,
`created_time` TIMESTAMP,
`updated_tme` TIMESTAMP,
PRIMARY KEY ( `id` ) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'order',
'properties.bootstrap.servers' = 'localhost:9092',
'sink.delivery-guarantee' = 'exactly-once',
'sink.transactional-id-prefix' = 'tx',
'properties.transaction.timeout.ms' = '65000',
'key.format' = 'json',
'value.format' = 'json'
);
DROP TABLE IF EXISTS t_order;
CREATE TABLE IF NOT EXISTS t_order (
`id` INT NOT NULL,
`prod_name` STRING,
`qty` INT,
`price` DOUBLE,
`created_time` TIMESTAMP,
`updated_tme` TIMESTAMP,
PRIMARY KEY ( `id` ) NOT ENFORCED
) WITH (
'server-id' = '1000',
'server-time-zone' = 'Asia/Shanghai',
'scan.incremental.snapshot.enabled' = 'false',
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'flink-demo',
'table-name' = 't_order'
);
INSERT INTO kfk_order
SELECT `id`
, `prod_name`
, `qty`
, `price`
, `qty` * `price` AS amont
, `created_time`
, `updated_tme`
FROM t_order;
配置 'execution.checkpointing.mode'='EXACTLY_ONCE' 的同时。端到端的Exactly once是通过两阶段提交实现的,依赖sink端的事物。所以需要在sink表添加几项关键配置,配置包括消息传递语义与事物相关配置。
'sink.delivery-guarantee' = 'exactly-once',
'sink.transactional-id-prefix' = 'tx',
'properties.transaction.timeout.ms' = '65000',
kafka消息的消费者,也需要配置事物隔离性为读已提交。
总结
Flink要满足端到端的Exactly Once语义,需要Source端,Checkpoint,Sink端的配合。单单配置Checkpoint Mode为Exactly Once不能满足端到端的语义。还需要Source端支持replay,Sink端支持事物。并且Sink端的下游消费者配置读已提交,才能防止脏读。