flink sink doris

接上文:一文说清flink从编码到部署上线
网上关于flink sink drois的例子较多,大部分不太全面,故本文详细说明,且提供完整代码。

flink doris版本对照表
在这里插入图片描述

1.添加依赖

<!--doris cdc-->
        <!-- 参考:"https://doris.apache.org/zh-CN/docs/ecosystem/flink-doris-connector"版本对照表。
        到"https://repo.maven.apache.org/maven2/org/apache/doris/"下载对应版本的jar-->
        <!--mvn install:install-file -Dfile=D:/maven/flink-doris-connector-1.14_2.11-1.1.1.jar -DgroupId=com.flink.doris -DartifactId=flink-doris-connector-1.14_2.11 -Dversion=1.1.1 -Dpackaging=jar-->
        <dependency>
            <groupId>com.flink.doris</groupId>
            <artifactId>flink-doris-connector-1.14_2.11</artifactId>
            <version>1.1.1</version>
        </dependency>

2.运行环境工具类

EnvUtil具体实现如下:

package com.zl.utils;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;
import java.time.ZoneOffset;
import java.util.concurrent.TimeUnit;

/**
 * EnvUtil
 * @description:
 */
public class EnvUtil {
   
    /**
     * 设置flink执行环境
     * @param parallelism 并行度
     */
    public static StreamExecutionEnvironment setFlinkEnv(int parallelism) {
   
        // System.setProperty("HADOOP_USER_NAME", "用户名") 对应的是 hdfs文件系统目录下的路径:/user/用户名的文件夹名,本文为root
        System.setProperty("HADOOP_USER_NAME", "root");
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        if (parallelism >0 ){
   
            //设置并行度
            env.setParallelism(parallelism);
        } else {
   
            env.setParallelism(1);// 默认1
        }

        // 添加重启机制
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, Time.minutes(6)));
        // 没有这个配置,会导致“Flink 任务没报错,但是无法同步数据到doris”。
        // 启动checkpoint,设置模式为精确一次 (这是默认值),10*60*1000=60000
        env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
        //rocksdb状态后端,启用增量checkpoint
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
        //设置checkpoint路径
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();

        // 同一时间只允许一个 checkpoint 进行(默认)
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        //最小间隔,10*60*1000=60000
        checkpointConfig.setMinPauseBetweenCheckpoints(60000);
        // 取消任务后,checkpoint仍然保存
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //checkpoint容忍失败的次数
        checkpointConfig.setTolerableCheckpointFailureNumber(5);
        //checkpoint超时时间 默认10分钟
        checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));
        //禁用operator chain(方便排查反压)
        env.disableOperatorChaining();
        return env;
    }
}

3.CDC实现

相关sql详见文末代码:“resources/doris”。

3.1 创建doris数据库脚本

CREATE DATABASE IF NOT EXISTS `flink_test`;
USE `flink_test`;

DROP TABLE IF EXISTS `rv_table`;
CREATE TABLE `rv_table` (`dt` date NULL COMMENT '分区时间',
                         `uuid` varchar(30) NULL COMMENT '',
                         `report_time` bigint(20) NULL COMMENT '过车时间'
) ENGINE=OLAP
DUPLICATE KEY(`dt`, `uuid`)
COMMENT 'RV 数据'
PARTITION BY RANGE(`dt`)
()
DISTRIBUTED BY HASH(`uuid
Apache Flink 可以通过 JDBC 连接器连接到 Doris,以实现将 Flink 数据流写入 Doris 表的目标。下面是 Flink 连接 Doris 的步骤: 1. 首先,需要将 Doris JDBC 驱动程序添加到 Flink 项目的依赖项中。可以在 Maven 中添加以下依赖项: ``` <dependency> <groupId>org.apache.calcite.avatica</groupId> <artifactId>avatica-core</artifactId> <version>1.13.0</version> </dependency> <dependency> <groupId>org.apache.doris</groupId> <artifactId>doris-jdbc</artifactId> <version>0.14.0</version> </dependency> ``` 2. 在 Flink 程序中创建一个 JDBC 连接,并将其用于写入 Doris。下面是一个示例代码片段: ``` // 创建 Doris 的 JDBC 连接 Properties properties = new Properties(); properties.put("user", "root"); properties.put("password", "root"); Connection dorisConnection = DriverManager.getConnection("jdbc:doris://localhost:9030/test", properties); // 将 Flink 数据流写入 Doris 表 DataStream<Tuple2<String, Integer>> dataStream = ...; dataStream.addSink(JdbcSink.sink( "INSERT INTO `test`.`word_count` (`word`, `count`) VALUES (?, ?)", (ps, t) -> { ps.setString(1, t.f0); ps.setInt(2, t.f1); }, new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:doris://localhost:9030/test") .withUsername("root") .withPassword("root") .build(), new JdbcExecutionOptions.JdbcExecutionOptionsBuilder() .withBatchSize(100) .build() )); ``` 在这个示例中,我们使用 `JdbcSink` 将 Flink 数据流写入 Doris 表。`JdbcSink` 接受 SQL 查询字符串、参数设置函数和 JDBC 连接选项作为参数。在参数设置函数中,我们将 Flink 数据流中的元组转换为 SQL 参数。`JdbcConnectionOptions` 和 `JdbcExecutionOptions` 用于配置 JDBC 连接选项和执行选项。 3. 在 Doris 中创建一个表,以存储 Flink 数据流。下面是一个示例 SQL 语句: ``` CREATE TABLE `test`.`word_count` ( `word` varchar(255) NOT NULL, `count` int(11) DEFAULT NULL, PRIMARY KEY (`word`) ) ENGINE=OLAP ``` 这个示例创建了一个名为 `word_count` 的表,其中包含两个列:`word` 和 `count`。`word` 列是主键列。 通过以上步骤,我们就可以将 Flink 数据流写入 Doris 表了。
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

core321

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值