Change Data Capture (CDC) is one of the critical patterns in modern data engineering that allows you to track and stream changes from a database in near real time. This is useful for syncing microservices, building data lakes, or feeding analytics systems with the latest data..
In this article, we will look at how to set up a CDC pipeline using:
- PostgreSQL as the source database
- Debezium as the CDC engine
- Apache Kafka for event streaming
- Kafka Connect to integrate Debezium and PostgreSQL
- A Kafka consumer application in Python to read changes
Project Setup
Create a file called docker-compose.yml with the code below. This sets up a container environment for the CDC setup using postgres and kafka
version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.3.0
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
postgres:
image: postgres:13
environment:
POSTGRES_USER: demo
POSTGRES_PASSWORD: demo
POSTGRES_DB: demo
connect:
image: debezium/connect:2.2
depends_on:
- kafka
- postgres
ports:
- 8083:8083
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
Run this command in bash to start the container and create the environment: docker-compose up -d
Step-1 Enable Logical Replication in PostgreSQL
Debezium relies on logical decoding, so you need to create a replication slot and set up your PostgreSQL database accordingly
Enter the PostgreSQL container
docker exec -it <postgres_container_id> bash
psql -U demo -d demo
ALTER SYSTEM SET wal_level = logical;
SELECT pg_reload_conf();
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
name VARCHAR(255),
email VARCHAR(255)
);
Step-2 Register Debezium PostgreSQL connector
Post data to Kafka connect to register the connector
curl -X POST http://localhost:8083/connectors
-H "Content-Type: application/json"
-d '{
"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "demo",
"database.password": "demo",
"database.dbname": "demo",
"database.server.name": "dbserver1",
"table.include.list": "public.customers",
"plugin.name": "pgoutput",
"slot.name": "debezium",
"publication.name": "dbz_publication"
}
}'
This creates a kafka topic dbserver1.public.customers
Step-3 Insert Data
Insert sample data into the table
INSERT INTO customers (name, email) VALUES ('Tom', '[email protected]');
INSERT INTO customers (name, email) VALUES ('Cruise', '[email protected]');
Step-4 Consume Data Using Python
Install the Kafka Python client
pip install kafka-python
See the script for Kafka consumer below:
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'dbserver1.public.customers',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
print("Listening for changes...n")
for message in consumer:
event = message.value
payload = event.get("payload")
if payload:
print(f"Operation: {payload['op']}")
print(f"Before: {payload['before']}")
print(f"After: {payload['after']}")
print("=" * 40)
Expected output:
Operation: c
Before: None
After: {'id': 1, 'name': 'Tom', 'email': '[email protected]'}
The op field indicates the operation:
- c: insert (create)
- u: update
- d: delete
Some of the applications where these types of pipeline are used include:
- Data lakes: Streaming DB changes into S3 or Delta Lake.
- Microservices: Syncing state changes across distributed services.
- Analytics: Feeding real-time dashboards with up-to-the-minute data.
- Auditing: Tracking who changed what and when in compliance-heavy industries
Final thoughts
Change Data Capture using Debezium, Kafka, and PostgreSQL opens real-time data replication without intrusive modifications to your application code. Integrating schema registry, adding Prometheus/Grafana monitoring, securing with TLS/ACLs, and containerizing your consumer services will allow this configuration to be extended into production. CDC is among the most potent tools in your data engineering arsenal if you are creating data products or moving to event-driven systems.