clickhouse通过MaterializeMySQL库引擎实现mysql实时同步

本文介绍了如何利用ClickHouse的MaterializeMySQL引擎实现与MySQL的实时数据同步,包括全量和增量同步、DDL和DML转换规则、SELECT查询处理、索引转换以及注意事项。案例部分详细展示了开启MySQL binlog和GTID模式,创建复制管道,以及在MySQL和ClickHouse间修改、删除数据的过程。

摘要生成于 C知道 ,由 DeepSeek-R1 满血版支持, 前往体验 >

一.概述

为了能够增强数据的实时性,利用 binlog 将数据写入到 ClickHouse。然而为了能够监听 binlog 事件,需要用到类似 canal 这样的第三方中间件,这无疑增加了系统的复杂度。

ClickHouse 20.8.2.3 版本新增加了 MaterializeMySQL 的 database 引擎,该 database 能映 射 到 MySQL 中 的 某 个 database , 并 自 动 在 ClickHouse 中 创 建 对 应 的

ReplacingMergeTree。ClickHouse 服务做为 MySQL 副本,读取 Binlog 并执行 DDL 和 DML 请求,实现了基于 MySQL Binlog 机制的业务数据库实时同步功能。

1.特点

(1)MaterializeMySQL 同时支持全量和增量同步,在 database 创建之初会全量同步MySQL 中的表和数据,之后则会通过 binlog 进行增量同步。

(2)MaterializeMySQL database 为其所创建的每张 ReplacingMergeTree 自动增加了_sign 和 _version 字段。

其中,_version 用作 ReplacingMergeTree 的 ver 版本参数,每当监听到 insert、update和 delete 事件时,在 databse 内全局自增。而 _sign 则用于标记是否被删除,取值 1 或

者 -1。

目前 MaterializeMySQL 支持如下几种 binlog 事件:

➢MYSQL_WRITE_ROWS_EVENT:_sign = 1,_version ++

➢MYSQL_DELETE_ROWS_EVENT:_sign = -1,_version ++

➢MYSQL_UPDATE_ROWS_EVENT:新数据 _sign = 1

➢MYSQL_QUERY_EVENT: 支持 CREATE TABLE 、DROP TABLE 、RENAME TABLE 等。

即支持mysql 5.6/5.7/8.0版本数据库,兼容insert,update,delete,alter,create,drop,truncate等大部分DDL操作。

2.使用细则

(1)DDL 查询

MySQL DDL 查询被转换成相应的 ClickHouse DDL 查询(ALTER, CREATE, DROP, RENAME)。如果 ClickHouse 不能解析某些 DDL 查询,该查询将被忽略。

(2)数据复制

MaterializeMySQL 不支持直接插入、删除和更新查询,而是将 DDL 语句进行相应转换:

①MySQL INSERT 查询被转换为 INSERT with _sign=1。

②MySQL DELETE 查询被转换为 INSERT with _sign=-1。

③MySQL UPDATE 查询被转换成 INSERT with _sign=1 和 INSERT with _sign=-1。

即使用MaterializedMySQL数据库引擎时,ReplacingMergeTree表与虚拟_sign和_version列一起使用。

  • _version— 交易计数器。键入UInt64。
  • _sign— 删除标记。键入Int8。可能的值:
    • 1— 未删除行,
    • -1— 行被删除。

(3)SELECT 查询

如果在 SELECT 查询中没有指定_version,则使用 FINAL 修饰符,返回_version 的最大值对应的数据,即最新版本的数据。

如果在 SELECT 查询中没有指定_sign,则默认使用 WHERE _sign=1,即返回未删除状态(_sign=1)的数据。

(4)索引转换

ClickHouse 数据库表会自动将 MySQL 主键和索引子句转换为 ORDER BY 元组。

ClickHouse 只有一个物理顺序,由 ORDER BY 子句决定。如果需要创建新的物理顺序,请使用物化视图。

  • _sign=-1没有从表中物理删除的行。

  • UPDATE/DELETE引擎不支持级联查询MaterializedMySQL,因为它们在 MySQL 二进制日志中不可见。

  • 复制很容易被破坏。

  • 禁止对数据库和表进行手动操作。

  • MaterializedMySQL受optimize_on_insert 设置影响。MaterializedMySQL当 MySQL 服务器中的表发生变化时,数据会合并到数据库中的相应表中。

(5)类型转换

MySQLClickHouse
TINYInt8
SHORTInt16
INT24Int32
LONGUInt32
LONGLONGUInt64
FLOATFloat32
DOUBLEFloat64
DECIMAL, NEWDECIMALDecimal
DATE, NEWDATEDate32
DATETIME, TIMESTAMPDateTime
DATETIME2, TIMESTAMP2DateTime64
YEARUInt16
TIMEInt64
ENUMEnum
STRINGString
VARCHAR, VAR_STRINGString
BLOBString
GEOMETRYString
BINARYFixedString
BITUInt64
SETUInt64

(6)创建语句以及配置参数

CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MaterializedMySQL('host:port', ['database' | database], 'user', 'password') [SETTINGS ...]
[TABLE OVERRIDE table1 (...), TABLE OVERRIDE table2 (...)]

引擎参数

  • host:port— MySQL 服务器端点。

  • database— MySQL 数据库名称。

  • user— MySQL 用户。

  • password- 用户密码。

引擎设置

  • max_rows_in_buffer— 允许数据在内存中缓存的最大行数(对于单个表且缓存数据无法查询)。当超过这个数字时,数据将被物化。默认值:65505。

  • max_bytes_in_buffer— 允许数据在内存中缓存的最大字节数(对于单个表且缓存数据无法查询)。当超过这个数字时,数据将被物化。默认值:1048576。

  • max_rows_in_buffers— 允许数据在内存中缓存的最大行数(对于数据库和缓存数据无法查询)。当超过这个数字时,数据将被物化。默认值:65505。

  • max_bytes_in_buffers— 允许数据在内存中缓存的最大字节数(对于数据库和缓存数据无法查询)。当超过这个数字时,数据将被物化。默认值:1048576。

  • max_flush_data_time— 允许数据在内存中缓存的最大毫秒数(对于数据库和缓存数据无法查询)。当超过这个时间时,数据将被物化。默认值:1000。

  • max_wait_time_when_mysql_unavailable— MySQL 不可用时的重试间隔(毫秒)。负值禁用重试。默认值:1000。

  • allows_query_when_mysql_lost— 允许在 MySQL 丢失时查询物化表。默认值:( 0) false。

  • materialized_mysql_tables_list— 以逗号分隔的 mysql 数据库表列表,将由 MaterializedMySQL 数据库引擎复制。默认值:空列表 — 表示将复制整个表。

(7)表覆盖

表覆盖可用于自定义 ClickHouse DDL 查询,允许您为应用程序进行架构优化。这对于控制分区特别有用,这对 MaterializedMySQL 的整体性能很重要。

这些是您可以对 MaterializedMySQL 的表覆盖进行的模式转换操作:

  • 修改列类型。必须与原始类型兼容,否则复制将失败。例如,您可以将 UInt32 列修改为 UInt64,但不能将 String 列修改为 Array(String)。
  • 修改列 TTL。
  • 修改列压缩编解码器。
  • 添加别名列。
  • 添加跳过索引
  • 添加投影。请注意,使用时会禁用投影优化SELECT … FINAL(MaterializedMySQL 默认会这样做),因此它们的实用性在这里受到限制。
  • 修改PARTITION BY
  • 修改ORDER BY
  • 修改主键
  • 添加样品
  • 添加表 TTL
CREATE DATABASE db_name ENGINE = MaterializedMySQL(...)
[SETTINGS ...]
[TABLE OVERRIDE table_name (
    [COLUMNS (
        [col_name [datatype] [ALIAS expr] [CODEC(...)] [TTL expr], ...]
        [INDEX index_name expr TYPE indextype[(...)] GRANULARITY val, ...]
        [PROJECTION projection_name (SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY]), ...]
    )]
    [ORDER BY expr]
    [PRIMARY KEY expr]
    [PARTITION BY expr]
    [SAMPLE BY expr]
    [TTL expr]
), ...]

示例:

CREATE DATABASE db_name ENGINE = MaterializedMySQL(...)
TABLE OVERRIDE table1 (
    COLUMNS (
        userid UUID,
        category LowCardinality(String),
        timestamp DateTime CODEC(Delta, Default)
    )
    PARTITION BY toYear(timestamp)
),
TABLE OVERRIDE table2 (
    COLUMNS (
        client_ip String TTL created + INTERVAL 72 HOUR
    )
    SAMPLE BY ip_hash
)

COLUMNS名单很稀疏;根据指定修改现有列,添加额外的 ALIAS 列。无法添加普通或 MATERIALIZED 列。具有不同类型的修改列必须可以从原始类型分配。目前在CREATE DATABASE执行查询时没有验证此问题或类似问题,因此需要格外小心。

可以为尚不存在的表指定覆盖。

!!!警告如果不小心使用,很容易使用表覆盖来破坏复制。

(8)注意事项

①MySQL中的每个表都应包含PRIMARY KEY.

②表的复制,那些包含ENUM字段值超出范围(在ENUM签名中指定)的行将不起作用。

③如果 ClickHouse 无法解析某些 DDL 查询,则忽略该查询。

二.案例实操

1. MySQL 开启 binlog 和 GTID 模式

(1)确保 MySQL 开启了 binlog 功能,且格式为 ROW

打开/etc/my.cnf,在[mysqld]下添加:

server-id=1 
log-bin=mysql-bin
binlog_format=ROW

(2)开启 GTID 模式

如果 clickhouse 使用的是 20.8 prestable 之后发布的版本,那么 MySQL 还需要配置开启 GTID 模式, 这种方式在 mysql 主从模式下可以确保数据同步的一致性(主从切换时)。

gtid-mode=on
enforce-gtid-consistency=1 # 设置为主从强一致性
log-slave-updates=1 # 记录日志

GTID 是 MySQL 复制增强版,从 MySQL 5.6 版本开始支持,目前已经是 MySQL 主流。复制模式。它为每个 event 分配一个全局唯一 ID 和序号,我们可以不用关心 MySQL 集群主从拓扑结构,直接告知 MySQL 这个 GTID 即可。

查询以下语句进行验证:

show variables like '%gtid_mode%';
show variables like '%enforce_gtid_consistency%';
show variables like '%binlog_format%';
gtid_mode    ON
enforce_gtid_consistency    ON
binlog_format    ROW

如以上值,则代表可以进行同步。

(3)重启 MySQL

sudo systemctl restart mysqld

(4)用户

同步用户建议最高级用户(拥有all权限)或者用户有RELOAD, REPLICATION SLAVE, REPLICATION CLIENT相关权限。

如为普通用户【5.7版本mysql】需执行 {必须 . 才是服务服务器权限}

grant select,reload,replication slave,replication client on *.* to syncdata@'%' identified  by "123";
flush privileges;--刷新权限

2. 准备 MySQL 表和数据

(1)在 MySQL 中创建数据表并写入数据

CREATE DATABASE testck;
CREATE TABLE `testck`.`t_organization` (
 `id` int(11) NOT NULL AUTO_INCREMENT,
 `code` int NOT NULL,
 `name` text DEFAULT NULL,
 `updatetime` datetime DEFAULT NULL,
 PRIMARY KEY (`id`),
 UNIQUE KEY (`code`)
) ENGINE=InnoDB;

INSERT INTO testck.t_organization (code, name,updatetime) 
VALUES(1000,'Realinsight',NOW());
INSERT INTO testck.t_organization (code, name,updatetime) 
VALUES(1001, 'Realindex',NOW());
INSERT INTO testck.t_organization (code, name,updatetime) 
VALUES(1002,'EDT',NOW());

(2)创建第二张表

CREATE TABLE `testck`.`t_user` (
 `id` int(11) NOT NULL AUTO_INCREMENT,
 `code` int,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB;


INSERT INTO testck.t_user (code) VALUES(1);

3. 开启 ClickHouse 物化引擎

注意:以下语句如果运行设置找不到,需要先在users.xml开启调用窗口函数使用状态在/etc/clickhouse-server/users.xml 配置文件中加入<allow_experimental_window_functions>1</allow_experimental_window_functions>

<?xml version="1.0"?>
<yandex>
<!-- Profiles of settings. -->
<profiles>
<!-- Default settings. -->
<default>
<!-- Maximum memory usage for processing single query, in bytes. -->
<max_memory_usage>10000000000</max_memory_usage>
 
<!-- How to choose between replicas during distributed query processing.
random - choose random replica from set of replicas with minimum number of errors
nearest_hostname - from set of replicas with minimum number of errors, choose replica
with minimum number of different symbols between replica's hostname and local hostname
(Hamming distance).
in_order - first live replica is chosen in specified order.
first_or_random - if first replica one has higher number of errors, pick a random one from replicas with minimum number of errors.
-->
<load_balancing>random</load_balancing>
<allow_experimental_window_functions>1</allow_experimental_window_functions>
</default>
 
<!-- Profile that allows only read queries. -->
<readonly>
<readonly>1</readonly>
</readonly>
</profiles>
......

然后开启 MaterializeMySQL 库引擎操作权限,然后配置文件中的allow_experimental_window_functions 可以去除【高版本会提示过时,旧版本可能需要保留,看登录的提示信息】

注意每次登录默认是关闭的,需要使用再操作打开。

set allow_experimental_database_materialize_mysql=1;

4. 创建复制管道

(1)ClickHouse 中创建 MaterializeMySQL 数据库

CREATE DATABASE test_binlog ENGINE = MaterializeMySQL('hadoop1:3306','testck','root','000000');

其中 4 个参数分别是 MySQL 地址、databse、username 和 password。

(2)查看 ClickHouse 的数据

use test_binlog;
show tables;
select * from t_organization;
select * from t_user;

5. 修改数据

(1)在 MySQL 中修改数据:

update t_organization set name = CONCAT(name,'-v1') where id = 1

(2)查看 clickhouse 日志可以看到 binlog 监听事件,查询 clickhouse

select * from t_organization;

6. 删除数据

(1)MySQL 删除数据:

DELETE FROM t_organization where id = 2;

(2)ClicKHouse,日志有 DeleteRows 的 binlog 监听事件,查看数据:

select * from t_organization;

(3)在刚才的查询中增加 _sign 和 _version 虚拟字段

select *,_sign,_version from t_organization order by _sign 
desc,_version desc;

在查询时,对于已经被删除的数据,_sign=-1,ClickHouse 会自动重写 SQL,将 _sign = -1 的数据过滤掉;

对于修改的数据,则自动重写 SQL,为其增加 FINAL 修饰符。

select * from t_organization
--等同于
select * from t_organization final where _sign = 1

7. 删除表

(1)在 mysql 执行删除表

drop table t_user;

(2)此时在 clickhouse 处会同步删除对应表,如果查询会报错

show tables;
select * from t_user;
DB::Exception: Table scene_mms.scene doesn't exist..

(3)mysql 新建表,clickhouse 可以查询到

CREATE TABLE `testck`.`t_user` (
 `id` int(11) NOT NULL AUTO_INCREMENT,
 `code` int,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB;

INSERT INTO testck.t_user (code) VALUES(1);

#ClickHouse 查询
show tables;
select * from t_user;
评论 3
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值