Apache Flink 是一款流处理框架,Flink CDC(Change Data Capture)是 Flink 提供的一个开源工具,它可以实时捕获数据库(如 MySQL)中的数据变更并将这些变更流式地传输到其他系统中(如 Doris)。通过 Flink CDC,可以实现从 MySQL 到 Doris 的数据同步。
本文将详细讲解如何通过 Flink CDC CLI 提交任务,实现整库同步 MySQL 到 Doris。我们将展示详细的配置和代码。
前提条件
- Flink 集群:确保 Flink 集群已经搭建完成。
- Doris 集群:确保 Doris 集群已经搭建完成。
- MySQL 数据库:需要从 MySQL 中同步数据。
- Flink CDC jar 包:确保下载并配置了 Flink CDC 的相关 jar 包。
步骤概览
- 配置 Flink CDC 连接器。
- 编写 Flink CDC 作业代码。
- 提交 Flink CDC 任务到 Flink 集群。
- 监控任务执行。
1. 配置 Flink CDC 连接器
首先,需要在 Flink 配置文件中添加 Flink CDC 相关的连接器。Flink CDC 提供了 MySQL 和 Doris 的连接器,确保将相关的连接器 jar 包放到 Flink 集群的 lib 目录中。
下载 Flink CDC 依赖
你可以从 Flink CDC 官方 GitHub 下载适合的版本。
在项目中使用 Maven 来集成依赖:
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-doris</artifactId>
<version>1.13.0</version>
</dependency>
2. 编写 Flink CDC 作业代码
通过 Flink CDC,你可以实现 MySQL 数据库到 Doris 的整库同步。Flink CDC 会捕获 MySQL 数据库的增量变化(如 INSERT、UPDATE、DELETE 操作),并将这些变化实时地推送到 Doris。
以下是一个简单的 Flink CDC 作业,展示如何将 MySQL 数据库的变更同步到 Doris:
MySQL 到 Doris 的同步代码
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.doris.DorisSink;
import org.apache.flink.streaming.connectors.mysql.MySQLSource;
import org.apache.flink.streaming.connectors.mysql.table.MySQLTableSource;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.jdbc.JdbcSink;
public class MySQLToDorisSync {
public static void main(String[] args) throws Exception {
// 创建 Flink 流式环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// MySQL 配置
String mysqlHost = "localhost";
String mysqlPort = "3306";
String mysqlDatabase = "your_database";
String mysqlUser = "your_username";
String mysqlPassword = "your_password";
// Doris 配置
String dorisHost = "localhost";
String dorisPort = "8030"; // Doris FE 服务端口
String dorisDatabase = "your_doris_database";
String dorisTable = "your_doris_table";
// 从 MySQL 获取增量数据
MySQLSource<String> mySQLSource = MySQLSource.<String>builder()
.hostname(mysqlHost)
.port(Integer.parseInt(mysqlPort))
.databaseList(mysqlDatabase)
.tableList("your_database.*") // 选择需要同步的表
.username(mysqlUser)
.password(mysqlPassword)
.startupMode(MySQLSource.StartupMode.LATEST_OFFSET) // 从最新的偏移量开始同步
.deserializer(new MySQLRowDataDebeziumDeserializationSchema()) // 自定义反序列化 schema
.build();
// 设置 Flink 任务的数据源
DataStream<String> mysqlStream = env.fromSource(mySQLSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
// 将数据流写入到 Doris
DorisSink<String> dorisSink = DorisSink.<String>builder()
.setHost(dorisHost)
.setPort(dorisPort)
.setDatabase(dorisDatabase)
.setTable(dorisTable)
.setUsername("doris_user")
.setPassword("doris_password")
.build();
// 向 Doris 写数据
mysqlStream.sinkTo(dorisSink);
// 执行 Flink 作业
env.execute("MySQL to Doris Sync");
}
}
关键点解释:
- MySQLSource:从 MySQL 中读取数据,通过 Flink CDC 捕获 MySQL 中的数据变更。
- DorisSink:将捕获到的数据变更写入到 Doris 集群。
- StartupMode.LATEST_OFFSET:表示从 MySQL 数据库的最新位置开始同步。
- 自定义反序列化 Schema:用于将 MySQL 的变更事件转换为 Flink 可识别的数据格式(此处使用了
MySQLRowDataDebeziumDeserializationSchema
)。
3. 提交 Flink CDC 任务到 Flink 集群
完成代码编写后,我们可以通过 Flink CLI 提交任务到 Flink 集群。
3.1 编译和打包
首先,使用 Maven 打包 Flink 作业:
mvn clean package -DskipTests
3.2 提交作业
使用 Flink CLI 提交作业:
flink run -c com.example.MySQLToDorisSync /path/to/your/flink-job.jar
3.3 监控任务
Flink 提供了 Web UI 用于监控任务执行情况。通过浏览器访问 Flink Web UI(通常是 http://localhost:8081
),可以查看作业的状态、任务的执行情况、以及各个节点的资源利用情况。
4. 完整代码示例
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.doris.DorisSink;
import org.apache.flink.streaming.connectors.mysql.MySQLSource;
import org.apache.flink.streaming.connectors.mysql.table.MySQLTableSource;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.datastream.DataStream;
public class MySQLToDorisSync {
public static void main(String[] args) throws Exception {
// Flink 环境初始化
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// MySQL 配置
String mysqlHost = "localhost";
String mysqlPort = "3306";
String mysqlDatabase = "your_database";
String mysqlUser = "your_username";
String mysqlPassword = "your_password";
// Doris 配置
String dorisHost = "localhost";
String dorisPort = "8030";
String dorisDatabase = "your_doris_database";
String dorisTable = "your_doris_table";
// MySQL CDC 源
MySQLSource<String> mysqlSource = MySQLSource.<String>builder()
.hostname(mysqlHost)
.port(Integer.parseInt(mysqlPort))
.databaseList(mysqlDatabase)
.tableList("your_database.*")
.username(mysqlUser)
.password(mysqlPassword)
.startupMode(MySQLSource.StartupMode.LATEST_OFFSET)
.deserializer(new MySQLRowDataDebeziumDeserializationSchema())
.build();
// 创建数据流
DataStream<String> mysqlStream = env.fromSource(mysqlSource, WatermarkStrategy.noWatermarks(), "MySQL CDC Source");
// Doris Sink 配置
DorisSink<String> dorisSink = DorisSink.<String>builder()
.setHost(dorisHost)
.setPort(dorisPort)
.setDatabase(dorisDatabase)
.setTable(dorisTable)
.setUsername("doris_user")
.setPassword("doris_password")
.build();
// 将数据写入 Doris
mysqlStream.sinkTo(dorisSink);
// 执行 Flink 作业
env.execute("MySQL to Doris Sync");
}
}
总结
通过 Flink CDC,我们可以实现 MySQL 到 Doris 的整库同步。这个流程的关键是配置好 MySQL 的增量捕获、并将数据变更流式地写入 Doris。通过 Flink CLI,我们可以方便地提交作业并监控任务执行情况。这个技术方案可以在数据同步、实时数据仓库等场景中应用。