DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Using PostgreSQL pgoutput Plugin for Change Data Capture With Debezium
  • Why Real-time Data Integration Is a Priority for Architects in the Modern Era
  • Scaling Salesforce Apps Using Heroku Microservices - Part 2
  • The Evolution of Adaptive Frameworks

Trending

  • What Is Plagiarism? How to Avoid It and Cite Sources
  • How To Build Resilient Microservices Using Circuit Breakers and Retries: A Developer’s Guide To Surviving
  • A Developer's Guide to Mastering Agentic AI: From Theory to Practice
  • Simplifying Multi-LLM Integration With KubeMQ
  1. DZone
  2. Data Engineering
  3. Databases
  4. Building a Real-Time Change Data Capture Pipeline With Debezium, Kafka, and PostgreSQL

Building a Real-Time Change Data Capture Pipeline With Debezium, Kafka, and PostgreSQL

Set up a real-time Change Data Capture pipeline using PostgreSQL, Debezium, Kafka, and Python. Stream database changes into Kafka topics and more.

By 
Vinaychand Muppala user avatar
Vinaychand Muppala
·
May. 29, 25 · Tutorial
Likes (1)
Comment
Save
Tweet
Share
2.1K Views

Join the DZone community and get the full member experience.

Join For Free

Change Data Capture (CDC) is a foundational pattern in modern data engineering. It enables systems to react to database changes in near real-time by streaming inserts, updates, and deletes as events. This capability is critical in a wide range of scenarios: synchronizing microservices, feeding real-time dashboards, updating machine learning features, powering audit logs, or building streaming data lakes.

In this tutorial, we’ll walk through how to implement a CDC pipeline using the following components:

  • PostgreSQL as the source database
  • Debezium as the CDC engine
  • Apache Kafka for streaming events
  • Kafka Connect to bridge Debezium and Kafka
  • A Kafka Python consumer to read the data stream in real-time

Let’s dive into setting up a fully functional CDC pipeline from scratch.

Project Setup

Create a file called docker-compose.yml with the below code. This sets up a container environment for CDC setup using Postgres and Kafka:

YAML
 
version: '3.7' 

services:
   zookeeper:
     image: confluentinc/cp-zookeeper:7.3.0
     environment:
       ZOOKEEPER_CLIENT_PORT: 2181

   kafka:
     image: confluentinc/cp-kafka:7.3.0
     depends_on:
       - zookeeper
     environment:
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
       KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1


   postgres:
     image: postgres:13
     environment:
       POSTGRES_USER: demo
       POSTGRES_PASSWORD: demo
       POSTGRES_DB: demo

   connect:
     image: debezium/connect:2.2
     depends_on:
       - kafka
       - postgres
     ports:
       - 8083:8083
     environment:
       BOOTSTRAP_SERVERS: kafka:9092
       GROUP_ID: 1
       CONFIG_STORAGE_TOPIC: my_connect_configs
       OFFSET_STORAGE_TOPIC: my_connect_offsets
       STATUS_STORAGE_TOPIC: my_connect_statuses
       KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
       VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
       CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
       CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"


Run this command in bash to start the container and create the environment:

docker-compose up -d

Step-1 Enable Logical Replication in Postgresql

Debezium relies on logical decoding, so you need to create a replication slot and set up your PostgreSQL database accordingly

Enter the PostgreSQL container:

Shell
 
docker exec -it <postgres_container_id> bash
psql -U demo -d demo


Then run:

SQL
 
ALTER SYSTEM SET wal_level = logical;
SELECT pg_reload_conf();


Create test table:

SQL
 
CREATE TABLE customers (
   id SERIAL PRIMARY KEY,
   name VARCHAR(255),
   email VARCHAR(255)
);


Step-2 Register Debezium Postgresql connector

Post data to Kafka; connect to register the connector:

Shell
 
curl -X POST http://localhost:8083/connectors \
   -H "Content-Type: application/json" \
   -d '{
     "name": "postgres-connector",
     "config": {
       "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
       "database.hostname": "postgres",
       "database.port": "5432",
       "database.user": "demo",
       "database.password": "demo",
       "database.dbname": "demo",
       "database.server.name": "dbserver1",
       "table.include.list": "public.customers",
       "plugin.name": "pgoutput",
       "slot.name": "debezium",
       "publication.name": "dbz_publication"
     }
  }'


This creates a kafka topic "dbserver1.public.customers"

Step-3 Insert Data

Insert sample data into the table:

SQL
 
INSERT INTO customers (name, email) VALUES ('Tom', 'tom@google.com');
INSERT INTO customers (name, email) VALUES ('Cruise', 'cruise@google.com');


Step-4 Consume Data Using Python

Install the Kafka Python client:

Python
 
pip install kafka-python


Below is the script for Kafka consumer:

Python
 
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
     'dbserver1.public.customers',
     bootstrap_servers='localhost:9092',
     auto_offset_reset='earliest',
     value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)

print("Listening for changes...\n")

for message in consumer:
     event = message.value
     payload = event.get("payload")
     if payload:
         print(f"Operation: {payload['op']}")
         print(f"Before: {payload['before']}")
         print(f"After: {payload['after']}")
         print("=" * 40)


Expected output:

Python
 
Operation: c
Before: None
After: {'id': 1, 'name': 'Tom', 'email': 'tom@google.com'}


The op field indicates the operation:

  • c: insert (create)
  • u: update
  • d: delete

Use Cases for CDC Pipelines

Change Data Capture (CDC) is foundational to building real-time, reactive data systems. Rather than relying on batch ETL jobs or periodic polling, CDC lets you stream changes as they happen, ensuring your downstream systems are always up-to-date. Here are some of the most impactful applications:

  • Data Lakes & Lakehouses
    Organizations are increasingly moving toward centralized data lakes like Amazon S3, Google Cloud Storage, or Delta Lake for scalable storage and analytics. Using CDC, you can stream every insert, update, or delete operation into your data lake in near real-time, allowing data scientists and analysts to work with fresh data without complex batch jobs.
  • Microservices Synchronization
    In distributed architectures, microservices often maintain their own databases. But when one service updates data that another service relies on, syncing becomes a challenge. CDC provides a low-latency, decoupled way to propagate changes across services without the need for direct API calls or shared databases.
  • Real-Time Analytics & Dashboards
    Business teams demand up-to-the-minute metrics. Feeding a dashboard with fresh data from operational systems used to require polling or manual triggers. With CDC, updates are streamed continuously into analytics systems like Apache Druid, ClickHouse, or Redshift for real-time BI.
  • Auditing and Compliance
    In regulated industries like finance, healthcare, or government, it’s crucial to track who changed what and when. CDC provides a reliable stream of every data modification, which can be logged and archived in append-only storage like cloud object stores or immutable logs.
  • Search Indexing
    Platforms like Elasticsearch or OpenSearch require current data to serve accurate results. Instead of manually syncing changes, you can use CDC to automatically reflect database changes in your search index, maintaining a live and searchable copy of your data.
  • Machine Learning Feature Stores
    Feature freshness matters in ML workflows. CDC can push updates to feature stores like Feast or Redis, ensuring models have the latest context at inference time, particularly in personalization, fraud detection, or recommendation engines.

Extending Into Production

While the development setup demonstrates the core principles, deploying a robust CDC pipeline in production requires careful design around reliability, observability, and security:

  • Schema Management with a Schema Registry
    As your database schema evolves, downstream consumers must adapt. Using a schema registry like Confluent Schema Registry ensures that all services adhere to compatible versions of Avro, JSON Schema, or Protobuf definitions, reducing the risk of breaking changes.
  • Monitoring & Observability
    Production-grade systems need clear visibility. Integrate Prometheus with Grafana to monitor Kafka brokers, Kafka Connect health, consumer lag, and Debezium metrics. Set alerts for error spikes, replication lag, or dropped messages.
  • Security Hardening
    Insecure pipelines expose critical data. Use TLS encryption for all communication (Kafka, Connect, PostgreSQL). Enforce SASL or OAuth2 authentication and Kafka ACLs to control producer and consumer access.
  • Scalability & Orchestration
    For large-scale deployments, run your services on Kubernetes using operators like Strimzi or Debezium Operator. These tools help manage rolling updates, fault tolerance, and scaling Kafka Connect workers.
  • Durability and Replayability
    Configure Kafka topic retention policies to retain data long enough for consumers to reprocess events if needed. Implement consumer offset checkpoints (e.g., using Apache Flink or Kafka Streams) to ensure fault-tolerant, exactly-once processing.

Final Thoughts

Debezium, Apache Kafka, and PostgreSQL together unlock high-throughput, low-latency data replication without rewriting your application code. By observing database change logs, you gain an always-on stream of data mutations, insertions, deletions, and updates—that can fuel downstream consumers with minimal delay.

What makes CDC powerful is not just the technology but the architecture it enables: event-driven systems that are reactive, scalable, and loosely coupled. Whether you're building a real-time feature store, syncing services across regions, or serving live business metrics, CDC is a backbone pattern that makes these systems feasible at scale.

As you take this pipeline from prototype to production, be sure to integrate best practices around schema evolution, observability, and security. With the right setup, CDC isn't just an optimization, it's a core strategy for data-first organizations striving for agility, speed, and precision.

Change data capture Data (computing) kafka PostgreSQL

Opinions expressed by DZone contributors are their own.

Related

  • Using PostgreSQL pgoutput Plugin for Change Data Capture With Debezium
  • Why Real-time Data Integration Is a Priority for Architects in the Modern Era
  • Scaling Salesforce Apps Using Heroku Microservices - Part 2
  • The Evolution of Adaptive Frameworks

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: