Migrating from MySQL with Kafka Connect
Capabilities: CDC, Incremental, Full Load
This tutorial shows how to build a real-time data pipeline from MySQL to Databend using Kafka Connect.
Overview
Kafka Connect is a tool for streaming data between Apache Kafka and other systems reliably and at scale. It simplifies building real-time data pipelines by standardizing data movement in and out of Kafka. For MySQL to Databend migration, Kafka Connect provides a seamless solution that enables:
- Real-time data synchronization from MySQL to Databend
- Automatic schema evolution and table creation
- Support for both new data capture and updates to existing data
The migration pipeline consists of two main components:
- MySQL JDBC Source Connector: Reads data from MySQL and publishes it to Kafka topics
- Databend Sink Connector: Consumes data from Kafka topics and writes it to Databend
Prerequisites
- MySQL database with data you want to migrate
- Apache Kafka installed (Kafka quickstart guide)
- Databend instance running
- Basic knowledge of SQL and command line
Step 1: Set Up Kafka Connect
Kafka Connect supports two execution modes: Standalone and Distributed. For this tutorial, we'll use Standalone mode which is simpler for testing.
Configure Kafka Connect
Create a basic worker configuration file connect-standalone.properties
in your Kafka config
directory:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
Step 2: Configure MySQL Source Connector
Install Required Components
-
Download the Kafka Connect JDBC plugin from Confluent Hub and extract it to your Kafka
libs
directory -
Download the MySQL JDBC Driver and copy the JAR file to the same
libs
directory
Create MySQL Source Configuration
Create a file mysql-source.properties
in your Kafka config
directory with the following content:
name=mysql-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
# Connection settings
connection.url=jdbc:mysql://localhost:3306/your_database?useSSL=false
connection.user=your_username
connection.password=your_password
# Table selection and topic mapping
table.whitelist=your_database.your_table
topics=mysql_data
# Sync mode configuration
mode=incrementing
incrementing.column.name=id
# Polling frequency
poll.interval.ms=5000
Replace the following values with your actual MySQL configuration:
your_database
: Your MySQL database nameyour_username
: MySQL usernameyour_password
: MySQL passwordyour_table
: The table you want to migrate
Sync Modes
The MySQL Source Connector supports three synchronization modes:
-
Incrementing Mode: Best for tables with an auto-incrementing ID column
mode=incrementing
incrementing.column.name=id -
Timestamp Mode: Best for capturing both inserts and updates
mode=timestamp
timestamp.column.name=updated_at -
Timestamp+Incrementing Mode: Most reliable for all changes
mode=timestamp+incrementing
incrementing.column.name=id
timestamp.column.name=updated_at
Step 3: Configure Databend Sink Connector
Install Required Components
-
Download the Databend Kafka Connector and place it in your Kafka
libs
directory -
Download the Databend JDBC Driver and copy it to your Kafka
libs
directory
Create Databend Sink Configuration
Create a file databend-sink.properties
in your Kafka config
directory:
name=databend-sink
connector.class=com.databend.kafka.connect.DatabendSinkConnector
# Connection settings
connection.url=jdbc:databend://localhost:8000
connection.user=databend
connection.password=databend
connection.database=default
# Topic to table mapping
topics=mysql_data
table.name.format=${topic}
# Table management
auto.create=true
auto.evolve=true
# Write behavior
insert.mode=upsert
pk.mode=record_value
pk.fields=id
batch.size=1000
Adjust the Databend connection settings as needed for your environment.
Step 4: Start the Migration Pipeline
Start Kafka Connect with both connector configurations:
bin/connect-standalone.sh config/connect-standalone.properties \
config/mysql-source.properties \
config/databend-sink.properties
Step 5: Verify the Migration
Check Data Synchronization
-
Monitor Kafka Connect Logs
tail -f /path/to/kafka/logs/connect.log
-
Verify Data in Databend
Connect to your Databend instance and run:
SELECT * FROM mysql_data LIMIT 10;
Test Schema Evolution
If you add a new column to your MySQL table, the schema change will automatically propagate to Databend:
-
Add a column in MySQL
ALTER TABLE your_table ADD COLUMN new_field VARCHAR(100);
-
Verify schema update in Databend
DESC mysql_data;
Test Update Operations
To test updates, ensure you're using timestamp or timestamp+incrementing mode:
-
Update your MySQL connector configuration
Edit
mysql-source.properties
to use timestamp+incrementing mode if your table has a timestamp column. -
Update data in MySQL
UPDATE your_table SET some_column='new value' WHERE id=1;
-
Verify the update in Databend
SELECT * FROM mysql_data WHERE id=1;
Key Features of Databend Kafka Connect
-
Automatic Table and Column Creation: With
auto.create
andauto.evolve
settings, tables and columns are created automatically based on Kafka topic data -
Schema Support: Supports Avro, JSON Schema, and Protobuf input data formats (requires Schema Registry)
-
Multiple Write Modes: Supports both
insert
andupsert
write modes -
Multi-task Support: Can run multiple tasks to improve performance
-
High Availability: In distributed mode, workload is automatically balanced with dynamic scaling and fault tolerance
Troubleshooting
- Connector Not Starting: Check Kafka Connect logs for errors
- No Data in Databend: Verify topic exists and contains data using Kafka console consumer
- Schema Issues: Ensure
auto.create
andauto.evolve
are set totrue