A modern evolution of my professional graph project — transforming batch-based CDC into a living, breathing real-time stream of data.
This project originates from one of my professional works, where the data pipeline was built on Microsoft SQL Server (MSSQL) using its built-in Change Data Capture (CDC) feature.
In that system, the CDC process tracked inserts, updates, and deletes directly from the transaction log, while a Python batch ETL handled the heavy lifting, rebuilding the entire customer graph once a month.
To simulate that environment in my personal portfolio (since the real code is under NDA), I recreated the entire design using PostgreSQL as a substitute for MSSQL.
This mimic version kept the same architecture and logic, allowing me to freely showcase the system’s functionality without disclosing any proprietary work.
The earlier approach had two layers:
While this worked reliably, it wasn’t truly real time.
Users couldn’t see fresh data until the next 4-hour cycle, and as data volume grew, the delay became more noticeable, especially for fraud detection and analytics use cases.
To overcome these limitations, I re-engineered the entire pipeline around real-time CDC streaming.
Instead of polling CDC tables on a schedule, this new version listens directly to PostgreSQL’s Write-Ahead Log (WAL) using Debezium, ensuring every change is captured the moment it happens.
The redesigned system now uses:
This shift transforms the pipeline from a scheduled sync process into a continuous data stream, keeping the graph perfectly aligned with its source databases: no waiting windows, no stale data.

The real-time CDC pipeline operates in three layers:
Debezium connectors monitor PostgreSQL’s Write-Ahead Log (WAL) and capture every INSERT, UPDATE, and DELETE operation:
PostgreSQL WAL → Debezium Connector → Kafka/Redpanda Topics
Key Benefits:
Redpanda acts as the messaging backbone, organizing changes into topics:
Topic Structure:
├── postgres-source-v1-profiles.public.accounts
├── postgres-source-v1-profiles.public.tickets
├── postgres-source-v1-support.public.accounts
└── postgres-source-v1-support.public.tickets
Why Redpanda over Standard Kafka:
The cdc_consumer.py script continuously processes CDC events and updates Neo4j:
Kafka Consumer → Process CDC Event → Update Neo4j Node
↓
Log to Control DB (cdc_run_logs, cdc_table_logs)
The CDC consumer is designed with the same metadata-driven philosophy as Part 1, but optimized for streaming:
CDCConsumer Class:
etl_control_db to load CDC configurationConfiguration Loading:
# Dynamically loads from etl_control_db:
- node_label_mappings (with is_cdc_enabled = true)
- field_exclusion_rules (with is_cdc_excluded = true)
- primary_keys for MERGE operations
- cdc_connectors for Kafka topic configuration
Message Processing Flow:
c=create, u=update, d=delete, r=snapshot)hashed_password)cdc_table_logs with message countThe consumer handles the flattened Debezium message format:
{
"op": "c",
"before": null,
"after": {
"account_id": "92f70463-4e74-431f-a567-8a0f7226b204",
"email": "user@example.com",
"full_name": "John Doe",
"created_at": "2025-11-08T16:09:50.004528Z"
},
"source": {
"version": "2.6.2.Final",
"connector": "postgresql",
"name": "postgres-source-v1-profiles",
"db": "profiles_db",
"schema": "public",
"table": "accounts",
"txId": 5861,
"lsn": 4872723424
},
"ts_ms": 1762618190487
}
Operation Types:
c (create): INSERT operation → MERGE in Neo4ju (update): UPDATE operation → MERGE in Neo4j (upsert)d (delete): DELETE operation → DETACH DELETE in Neo4jr (read): Initial snapshot → MERGE in Neo4jThe etl_control_db from Part 1 is extended with CDC-specific tables:
cdc_connectors:
- connector_id: Unique identifier
- source_id: Links to source_databases
- connector_name: Debezium connector name
- kafka_topic_prefix: Topic naming convention
- is_active: Enable/disable connector
cdc_run_logs:
- log_id: Unique run identifier
- run_start_time: When CDC consumer started
- run_end_time: When CDC consumer stopped
- status: running | stopped | failed
- total_messages_processed: Count across all tables
cdc_table_logs:
- table_log_id: Unique identifier
- log_id: Links to cdc_run_logs
- source_id: Which source database
- table_name: Which table being tracked
- messages_processed: Real-time message count
- status: processing | completed | failed
- is_progressing: Boolean flag for monitoring
- start_time, end_time: Timestamps
node_label_mappings:
is_cdc_enabled column to mark tables for real-time syncfield_exclusion_rules:
is_cdc_excluded column to filter fields in CDC events-- Execute in PostgreSQL
INSERT INTO accounts (email, hashed_password, full_name)
VALUES ('new.user@email.com', 'hashed_pwd', 'New User');
What Happens:
-- Execute in PostgreSQL
UPDATE accounts
SET full_name = 'Updated Name'
WHERE email = 'user@email.com';
CDC Consumer Log:
[DEBUG] Processing message - Table: accounts, Operation: u
[DEBUG] UPDATE data: {account_id: '...', email: '...', full_name: 'Updated Name'}
[OK] Updated Account (account_id=...)
[DEBUG] Neo4j counters - properties_set: 3
-- Execute in PostgreSQL
DELETE FROM accounts WHERE email = 'user@email.com';
CDC Consumer Cypher:
MATCH (n:Account {account_id: '...'})
DETACH DELETE n
Removes node and all relationships instantly.
The CDC consumer provides comprehensive logging:
[LOG] Started CDC run log (ID: 4)
[LOG] Started table log for accounts (ID: 7)
[LOG] Started table log for tickets (ID: 8)
[INFO] Waiting for messages...
[DEBUG] Processing message - Topic: postgres-source-v1-profiles.public.accounts
[OK] Updated Account (account_id=...)
[PROGRESS] Processed 100 messages
[HEARTBEAT] Consumer active - 245 messages processed so far
Check Current CDC Run:
SELECT
log_id,
run_start_time,
status,
total_messages_processed,
run_end_time - run_start_time AS duration
FROM cdc_run_logs
WHERE status = 'running'
ORDER BY run_start_time DESC;
Table Processing Status:
SELECT
tl.table_name,
tl.status,
tl.messages_processed,
tl.is_progressing,
EXTRACT(EPOCH FROM (NOW() - tl.start_time)) AS seconds_running
FROM cdc_table_logs tl
JOIN cdc_run_logs rl ON rl.log_id = tl.log_id
WHERE rl.status = 'running'
ORDER BY tl.messages_processed DESC;
CDC Performance Metrics:
SELECT
table_name,
AVG(messages_processed) AS avg_messages,
MAX(messages_processed) AS max_messages,
AVG(EXTRACT(EPOCH FROM (end_time - start_time))) AS avg_duration_seconds
FROM cdc_table_logs
WHERE status = 'completed'
GROUP BY table_name
ORDER BY avg_messages DESC;
-- Step 1: Mark table as CDC-enabled
UPDATE node_label_mappings
SET is_cdc_enabled = true
WHERE table_name = 'subscriptions';
-- Step 2: Add field exclusions for CDC
INSERT INTO field_exclusion_rules
(table_name, column_name, is_cdc_excluded)
VALUES
('subscriptions', 'internal_notes', true),
('subscriptions', 'stripe_customer_id', true);
-- Step 3: Restart CDC consumer to pick up changes
INSERT INTO cdc_connectors
(source_id, connector_name, kafka_topic_prefix, is_active)
VALUES
(1, 'postgres-profiles-connector', 'postgres-source-v1-profiles', true);
-- Temporarily disable CDC for a table
UPDATE node_label_mappings
SET is_cdc_enabled = false
WHERE table_name = 'accounts';
-- CDC consumer will skip this table until re-enabled
Terminal 1: Start CDC Consumer
python cdc_consumer.py
Terminal 2: Execute SQL Changes
-- Test INSERT
INSERT INTO accounts (email, hashed_password, full_name)
VALUES ('test@example.com', 'hashed', 'Test User');
-- Test UPDATE
UPDATE accounts
SET full_name = 'Test User Updated'
WHERE email = 'test@example.com';
-- Test DELETE
DELETE FROM accounts WHERE email = 'test@example.com';
Terminal 3: Monitor Neo4j
// Watch for changes in real-time
MATCH (a:Account)
WHERE a.email CONTAINS 'test'
RETURN a
Expected Results:
From Professional Project to Modern Architecture:
The original MSSQL CDC implementation taught me that change data capture is powerful but implementation matters. This rebuild with Debezium and Redpanda shows:
This project demonstrates the evolution from traditional batch ETL to modern streaming architectures, a journey many enterprises are taking today.
Related Projects: