接上文:一文说清flink从编码到部署上线
网上关于flink sink drois的例子较多,大部分不太全面,故本文详细说明,且提供完整代码。
flink doris版本对照表
1.添加依赖
<!--doris cdc-->
<!-- 参考:"https://doris.apache.org/zh-CN/docs/ecosystem/flink-doris-connector"版本对照表。
到"https://repo.maven.apache.org/maven2/org/apache/doris/"下载对应版本的jar-->
<!--mvn install:install-file -Dfile=D:/maven/flink-doris-connector-1.14_2.11-1.1.1.jar -DgroupId=com.flink.doris -DartifactId=flink-doris-connector-1.14_2.11 -Dversion=1.1.1 -Dpackaging=jar-->
<dependency>
<groupId>com.flink.doris</groupId>
<artifactId>flink-doris-connector-1.14_2.11</artifactId>
<version>1.1.1</version>
</dependency>
2.运行环境工具类
EnvUtil具体实现如下:
package com.zl.utils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.time.Duration;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;
/**
* EnvUtil
* @description:
*/
public class EnvUtil {
/**
* 设置flink执行环境
* @param parallelism 并行度
*/
public static StreamExecutionEnvironment setFlinkEnv(int parallelism) {
// System.setProperty("HADOOP_USER_NAME", "用户名") 对应的是 hdfs文件系统目录下的路径:/user/用户名的文件夹名,本文为root
System.setProperty("HADOOP_USER_NAME", "root");
Configuration conf = new Configuration();
conf.setInteger("rest.port", 1000);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
if (parallelism >0 ){
//设置并行度
env.setParallelism(parallelism);
} else {
env.setParallelism(1);// 默认1
}
// 添加重启机制
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, Time.minutes(6)));
// 没有这个配置,会导致“Flink 任务没报错,但是无法同步数据到doris”。
// 启动checkpoint,设置模式为精确一次 (这是默认值),10*60*1000=60000
env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
//rocksdb状态后端,启用增量checkpoint
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
//设置checkpoint路径
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
// 同一时间只允许一个 checkpoint 进行(默认)
checkpointConfig.setMaxConcurrentCheckpoints(1);
//最小间隔,10*60*1000=60000
checkpointConfig.setMinPauseBetweenCheckpoints(60000);
// 取消任务后,checkpoint仍然保存
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//checkpoint容忍失败的次数
checkpointConfig.setTolerableCheckpointFailureNumber(5);
//checkpoint超时时间 默认10分钟
checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
//禁用operator chain(方便排查反压)
env.disableOperatorChaining();
return env;
}
}
3.CDC实现
相关sql详见文末代码:“resources/doris”。
3.1 创建doris数据库脚本
CREATE DATABASE IF NOT EXISTS `flink_test`;
USE `flink_test`;
DROP TABLE IF EXISTS `rv_table`;
CREATE TABLE `rv_table` (`dt` date NULL COMMENT '分区时间',
`uuid` varchar(30) NULL COMMENT '',
`report_time` bigint(20) NULL COMMENT '过车时间'
) ENGINE=OLAP
DUPLICATE KEY(`dt`, `uuid`)
COMMENT 'RV 数据'
PARTITION BY RANGE(`dt`)
()
DISTRIBUTED BY HASH(`uuid