通过 Flink CDC CLI 提交任务:整库同步 MySQL 到 Doris

Apache Flink 是一款流处理框架,Flink CDC(Change Data Capture)是 Flink 提供的一个开源工具,它可以实时捕获数据库(如 MySQL)中的数据变更并将这些变更流式地传输到其他系统中(如 Doris)。通过 Flink CDC,可以实现从 MySQL 到 Doris 的数据同步。

本文将详细讲解如何通过 Flink CDC CLI 提交任务,实现整库同步 MySQL 到 Doris。我们将展示详细的配置和代码。

前提条件

  • Flink 集群:确保 Flink 集群已经搭建完成。
  • Doris 集群:确保 Doris 集群已经搭建完成。
  • MySQL 数据库:需要从 MySQL 中同步数据。
  • Flink CDC jar 包:确保下载并配置了 Flink CDC 的相关 jar 包。

步骤概览

  1. 配置 Flink CDC 连接器。
  2. 编写 Flink CDC 作业代码。
  3. 提交 Flink CDC 任务到 Flink 集群。
  4. 监控任务执行。

1. 配置 Flink CDC 连接器

首先,需要在 Flink 配置文件中添加 Flink CDC 相关的连接器。Flink CDC 提供了 MySQL 和 Doris 的连接器,确保将相关的连接器 jar 包放到 Flink 集群的 lib 目录中。

下载 Flink CDC 依赖

你可以从 Flink CDC 官方 GitHub 下载适合的版本。

在项目中使用 Maven 来集成依赖:

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>2.0.0</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-doris</artifactId>
    <version>1.13.0</version>
</dependency>

2. 编写 Flink CDC 作业代码

通过 Flink CDC,你可以实现 MySQL 数据库到 Doris 的整库同步。Flink CDC 会捕获 MySQL 数据库的增量变化(如 INSERT、UPDATE、DELETE 操作),并将这些变化实时地推送到 Doris。

以下是一个简单的 Flink CDC 作业,展示如何将 MySQL 数据库的变更同步到 Doris:

MySQL 到 Doris 的同步代码

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.doris.DorisSink;
import org.apache.flink.streaming.connectors.mysql.MySQLSource;
import org.apache.flink.streaming.connectors.mysql.table.MySQLTableSource;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.JdbcSink;

public class MySQLToDorisSync {

    public static void main(String[] args) throws Exception {
        // 创建 Flink 流式环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        // MySQL 配置
        String mysqlHost = "localhost";
        String mysqlPort = "3306";
        String mysqlDatabase = "your_database";
        String mysqlUser = "your_username";
        String mysqlPassword = "your_password";
        
        // Doris 配置
        String dorisHost = "localhost";
        String dorisPort = "8030";  // Doris FE 服务端口
        String dorisDatabase = "your_doris_database";
        String dorisTable = "your_doris_table";
        
        // 从 MySQL 获取增量数据
        MySQLSource<String> mySQLSource = MySQLSource.<String>builder()
                .hostname(mysqlHost)
                .port(Integer.parseInt(mysqlPort))
                .databaseList(mysqlDatabase)
                .tableList("your_database.*") // 选择需要同步的表
                .username(mysqlUser)
                .password(mysqlPassword)
                .startupMode(MySQLSource.StartupMode.LATEST_OFFSET) // 从最新的偏移量开始同步
                .deserializer(new MySQLRowDataDebeziumDeserializationSchema())  // 自定义反序列化 schema
                .build();

        // 设置 Flink 任务的数据源
        DataStream<String> mysqlStream = env.fromSource(mySQLSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");

        // 将数据流写入到 Doris
        DorisSink<String> dorisSink = DorisSink.<String>builder()
                .setHost(dorisHost)
                .setPort(dorisPort)
                .setDatabase(dorisDatabase)
                .setTable(dorisTable)
                .setUsername("doris_user")
                .setPassword("doris_password")
                .build();

        // 向 Doris 写数据
        mysqlStream.sinkTo(dorisSink);

        // 执行 Flink 作业
        env.execute("MySQL to Doris Sync");
    }
}

关键点解释:

  • MySQLSource:从 MySQL 中读取数据,通过 Flink CDC 捕获 MySQL 中的数据变更。
  • DorisSink:将捕获到的数据变更写入到 Doris 集群。
  • StartupMode.LATEST_OFFSET:表示从 MySQL 数据库的最新位置开始同步。
  • 自定义反序列化 Schema:用于将 MySQL 的变更事件转换为 Flink 可识别的数据格式(此处使用了 MySQLRowDataDebeziumDeserializationSchema)。

3. 提交 Flink CDC 任务到 Flink 集群

完成代码编写后,我们可以通过 Flink CLI 提交任务到 Flink 集群。

3.1 编译和打包

首先,使用 Maven 打包 Flink 作业:

mvn clean package -DskipTests

3.2 提交作业

使用 Flink CLI 提交作业:

flink run -c com.example.MySQLToDorisSync /path/to/your/flink-job.jar

3.3 监控任务

Flink 提供了 Web UI 用于监控任务执行情况。通过浏览器访问 Flink Web UI(通常是 http://localhost:8081),可以查看作业的状态、任务的执行情况、以及各个节点的资源利用情况。

4. 完整代码示例

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.doris.DorisSink;
import org.apache.flink.streaming.connectors.mysql.MySQLSource;
import org.apache.flink.streaming.connectors.mysql.table.MySQLTableSource;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;

public class MySQLToDorisSync {

    public static void main(String[] args) throws Exception {
        // Flink 环境初始化
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // MySQL 配置
        String mysqlHost = "localhost";
        String mysqlPort = "3306";
        String mysqlDatabase = "your_database";
        String mysqlUser = "your_username";
        String mysqlPassword = "your_password";
        
        // Doris 配置
        String dorisHost = "localhost";
        String dorisPort = "8030";
        String dorisDatabase = "your_doris_database";
        String dorisTable = "your_doris_table";

        // MySQL CDC 源
        MySQLSource<String> mysqlSource = MySQLSource.<String>builder()
                .hostname(mysqlHost)
                .port(Integer.parseInt(mysqlPort))
                .databaseList(mysqlDatabase)
                .tableList("your_database.*")
                .username(mysqlUser)
                .password(mysqlPassword)
                .startupMode(MySQLSource.StartupMode.LATEST_OFFSET)
                .deserializer(new MySQLRowDataDebeziumDeserializationSchema())
                .build();

        // 创建数据流
        DataStream<String> mysqlStream = env.fromSource(mysqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");

        // Doris Sink 配置
        DorisSink<String> dorisSink = DorisSink.<String>builder()
                .setHost(dorisHost)
                .setPort(dorisPort)
                .setDatabase(dorisDatabase)
                .setTable(dorisTable)
                .setUsername("doris_user")
                .setPassword("doris_password")
                .build();

        // 将数据写入 Doris
        mysqlStream.sinkTo(dorisSink);

        // 执行 Flink 作业
        env.execute("MySQL to Doris Sync");
    }
}

总结

通过 Flink CDC,我们可以实现 MySQL 到 Doris 的整库同步。这个流程的关键是配置好 MySQL 的增量捕获、并将数据变更流式地写入 Doris。通过 Flink CLI,我们可以方便地提交作业并监控任务执行情况。这个技术方案可以在数据同步、实时数据仓库等场景中应用。

评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包

打赏作者

一只蜗牛儿

你的鼓励将是我创作的最大动力

¥1 ¥2 ¥4 ¥6 ¥10 ¥20
扫码支付:¥1
获取中
扫码支付

您的余额不足,请更换扫码支付或充值

打赏作者

实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值