A metadata-driven ETL pipeline to unify siloed PostgreSQL databases into a Neo4j graph for advanced fraud detection and customer analysis.
This project is a recreation of one of my first professional projects from my early years as a software engineer. The original version, which is under a strict NDA, used a stack of MSSQL Server, Ontotext GraphDB (a semantic graph database), and Python, with a separate PostgreSQL database just for fetching configurations.
After the graph was populated, my team and I also built a REST API (using a Express Js) on top of it. This API served the aggregated graph data to our front-end applications, providing the rich, connected data that the clients needed.
To showcase this architecture in my portfolio, I’ve rebuilt it from memory using a substitute stack: PostgreSQL (replacing MSSQL) and Neo4j (replacing Ontotext).

In a typical enterprise, critical business data is “siloed” across different databases that cannot communicate. This project simulates that real-world problem:
profiles_db (Postgres): A modern microservice database for user accounts, subscriptions, and billing.
streaming_db (Postgres): A high-volume analytics database for user streaming history (play events, IP addresses, devices).
support_db (Postgres): A legacy support desk database for customer tickets.
This separation makes it “impossible” to answer critical, cross-functional questions in real-time. For example: “How many users who share a payment method with a ‘hacked’ account are also sharing an IP address with other users?“
This pipeline solves the silo problem by extracting, transforming, and loading data from all three sources into a single, unified Neo4j graph.
By connecting data based on shared identifiers (like email, account_id, card_fingerprint), we build a “Customer 360” view that enables powerful analysis that was previously impossible.
This isn’t just a simple script; it’s a robust, configurable pipeline.
100% Metadata-Driven: The entire ETL process is controlled by the etl_control_db. There are no hardcoded table names, relationships, or credentials in the Python code.
Dynamic Schema Mapping: You can add/remove tables, exclude sensitive columns (like hashed_password), and define new node labels just by editing rows in the control database.
Auditable & Logged: Every ETL run is logged in etl_run_logs, and every table processed is tracked in etl_table_logs, giving you a complete audit trail.
Database-First Design: The pipeline reads all configuration, rules, and mappings directly from the database, making it a “single source of truth.”
etl_control_db ArchitectureThe etl_control_db is the “brains” of the entire pipeline. The Python script is stateless and reads all its instructions from this database.
Here is a breakdown of the key configuration tables:
source_databases:
is_active = true, it will fetch the connection details and begin processing.field_exclusion_rules:
accounts), the script fetches all rules for that table. It dynamically builds its SELECT statement to only include columns where is_excluded = false. This is how PII like hashed_password is filtered out.node_label_mappings:
accounts table, it looks it up here and finds it should create nodes with the label :Account.relationship_mappings:
MATCH...WHERE...CREATE statements. For example:
from_label: Accountto_label: PaymentMethodrelationship_type: HAS_PAYMENT_METHODjoin_condition: a.account_id = pm.account_idetl_run_logs & etl_table_logs:
etl_run_logs entry when it starts. It then creates an etl_table_logs entry for every table it’s about to process. As it completes each table, it updates the row with status, rows_processed, etc., giving you a granular, real-time view of the pipeline’s progress.main.py)The main.py script acts as the orchestrator, bringing all the components together. It operates in two distinct phases:
PostgresToNeo4jETL Class)This phase reads from all Postgres sources and generates a single, massive Cypher script.
main() function instantiates ETLConfig (from .env) and passes it to the PostgresToNeo4jETL class. The class connects to the etl_control_db.etl_run_logs to get a log_id for this session.source_databases table to get a list of all active databases to process.profiles_db.accounts):
field_exclusion_rules to dynamically build a SELECT statement that omits columns like hashed_password.node_label_mappings to find the correct Neo4j label (e.g., accounts -> :Account).SELECT account_id, email, full_name... FROM accounts).CREATE statements for every row and logs the task to etl_table_logs.relationship_mappings table. It uses this metadata to build the complex, cross-database MATCH...WHERE...CREATE statements that connect the nodes.graph_output_20251102.cypher) and registers it in the cypher_scripts table.Neo4jLoader Class)This phase takes the generated script and executes it against Neo4j.
main() function instantiates the Neo4jLoader with the Neo4j credentials from the .env file. The class connects to your Neo4j Aura instance.clear_database=True is set in main(), the loader first runs MATCH (n) DETACH DELETE n to wipe the graph clean.graph_output.cypher file (passed to it from main()), splits its content into a list of individual Cypher statements, and executes them one by one.etl_control_db one last time to update the cypher_scripts table with a last_run_time to log the successful load.The pipeline automatically generates the following graph structure based on the rules in the control database:
(:Account)
(:Plan)
(:PaymentMethod)
(:Subscription)
(:Media)
(:PlayEvent)
(:Ticket)
(:Tag)
(:TicketTagLink)
(:Account) -[:HAS_PAYMENT_METHOD]-> (:PaymentMethod)
(:Account) -[:HAS_SUBSCRIPTION]-> (:Subscription)
(:Subscription) -[:SUBSCRIBED_TO]-> (:Plan)
(:PlayEvent) -[:PLAYED]-> (:Media)
(:Ticket) -[:TAGGED_WITH]-> (:Tag)
(:Account) -[:SUBMITTED]-> (:Ticket) (Links profiles_db to support_db)
(:Account) -[:WATCHED]-> (:PlayEvent) (Links profiles_db to streaming_db)
With the unified graph, you can now run powerful analytical queries that would crash a SQL server. This list contains the 10 queries that were validated to return data with the loaded sample set.
IMPORTANT: These queries may use multiple MATCH and WITH clauses. You must copy and run each query as a single, complete block from the first MATCH to the final RETURN or LIMIT. Running them line by line will cause an “Incomplete query” error.
1. PAYMENT FRAUD: Multiple accounts using same credit card
MATCH (a1:Account)-[:HAS_PAYMENT_METHOD]->(pm1:PaymentMethod)
MATCH (a2:Account)-[:HAS_PAYMENT_METHOD]->(pm2:PaymentMethod)
WHERE a1.account_id < a2.account_id
AND pm1.card_fingerprint = pm2.card_fingerprint
RETURN a1.email AS account_1,
a2.email AS account_2,
pm1.card_fingerprint AS shared_card_fingerprint,
pm1.card_type AS card_type,
'***' + pm1.card_last_four AS card_ending
ORDER BY shared_card_fingerprint;
2. CUSTOMER LIFETIME VALUE: Complete profile
MATCH (a:Account)
OPTIONAL MATCH (a)-[:HAS_SUBSCRIPTION]->(s:Subscription)-[:SUBSCRIBED_TO]->(p:Plan)
OPTIONAL MATCH (a)-[:WATCHED]->(pe:PlayEvent)-[:PLAYED]->(m:Media)
OPTIONAL MATCH (a)-[:SUBMITTED]->(t:Ticket)
WITH a, s, p,
COUNT(DISTINCT pe) AS total_streams,
COUNT(DISTINCT m) AS unique_content,
COUNT(DISTINCT t) AS tickets,
duration.between(a.created_at, datetime()).months AS months_active
RETURN a.email,
a.full_name,
p.plan_name AS plan,
toFloat(p.monthly_fee) * months_active AS estimated_revenue,
total_streams,
unique_content,
tickets,
months_active,
CASE
WHEN total_streams > 20 THEN 'Power User'
WHEN total_streams > 10 THEN 'Regular User'
WHEN total_streams > 0 THEN 'Casual User'
ELSE 'At-Risk (No Activity)'
END AS segment
ORDER BY estimated_revenue DESC
LIMIT 20;
3. CHURN PREDICTION: Find at-risk customers
// NOTE: datetime() is hardcoded to '2025-11-03' to work with sample data
WITH datetime('2025-11-03T00:00:00') AS now
MATCH (a:Account)-[:HAS_SUBSCRIPTION]->(s:Subscription)
OPTIONAL MATCH (a)-[:WATCHED]->(pe:PlayEvent)
WHERE pe.event_timestamp > now - duration({days: 30})
OPTIONAL MATCH (a)-[:SUBMITTED]->(t:Ticket)
WHERE t.created_at > now - duration({days: 30})
WITH a, s,
COUNT(DISTINCT pe) AS recent_streams,
COUNT(DISTINCT t) AS recent_tickets,
duration.between(a.created_at, now).months AS age_months
WHERE s.status IN ['active', 'Active']
RETURN a.email,
recent_streams,
recent_tickets,
age_months,
CASE
WHEN recent_streams = 0 AND recent_tickets > 0 THEN 'HIGH RISK - Problems + No Usage'
WHEN recent_streams = 0 THEN 'HIGH RISK - No Activity'
WHEN recent_streams < 5 THEN 'MEDIUM RISK - Low Engagement'
ELSE 'LOW RISK - Active User'
END AS churn_risk
ORDER BY
recent_streams ASC,
recent_tickets DESC
LIMIT 20;
4. GENRE PREFERENCES: What each user loves
MATCH (a:Account)-[:WATCHED]->(pe:PlayEvent)-[:PLAYED]->(m:Media)
WITH a, m.genre AS genre, COUNT(*) AS watches
ORDER BY a.account_id, watches DESC
WITH a, COLLECT({genre: genre, count: watches})[0..3] AS top_genres
RETURN a.email,
a.full_name,
[g IN top_genres | g.genre + ' (' + g.count + ' plays)'] AS favorite_genres
LIMIT 20;
5. REVENUE BY PLAN
MATCH (p:Plan)<-[:SUBSCRIBED_TO]-(s:Subscription)<-[:HAS_SUBSCRIPTION]-(a:Account)
WHERE s.status IN ['active', 'Active']
OPTIONAL MATCH (a)-[:WATCHED]->(pe:PlayEvent)
WITH p,
COUNT(DISTINCT a) AS subscribers,
toFloat(p.monthly_fee) * COUNT(DISTINCT a) AS monthly_revenue,
COUNT(DISTINCT pe) AS total_streams
RETURN p.plan_name,
subscribers,
monthly_revenue,
ROUND(toFloat(total_streams) / subscribers, 1) AS avg_streams_per_user,
p.max_devices
ORDER BY monthly_revenue DESC;
6. MOST POPULAR CONTENT
MATCH (m:Media)<-[:PLAYED]-(pe:PlayEvent)
WITH m,
COUNT(*) AS total_events,
COUNT(CASE WHEN pe.action IN ['play', 'START'] THEN 1 END) AS starts,
COUNT(CASE WHEN pe.action IN ['stop', 'FINISH'] THEN 1 END) AS completions
WHERE starts > 0
RETURN m.title,
m.genre,
m.media_type,
m.release_date,
starts AS play_starts,
completions,
ROUND(100.0 * completions / starts, 1) AS completion_rate_pct
ORDER BY starts DESC
LIMIT 20;
7. DEVICE ANALYSIS
MATCH (pe:PlayEvent)<-[:WATCHED]-(a:Account)
WITH pe.device_fingerprint AS device,
COUNT(*) AS usage_count,
COLLECT(DISTINCT a.email) AS users
ORDER BY usage_count DESC
LIMIT 10
OPTIONAL MATCH (a2:Account {email: users[0]})-[:SUBMITTED]->(t:Ticket)-[:TAGGED_WITH]->(tag:Tag)
WHERE tag.tag_name IN ['Technical', 'Playback']
RETURN device,
usage_count,
SIZE(users) AS unique_users,
COUNT(DISTINCT t) AS technical_tickets
ORDER BY usage_count DESC;
8. COMPLETE CUSTOMER JOURNEY
MATCH (a:Account {email: 'john.smith@email.com'})
OPTIONAL MATCH (a)-[:HAS_SUBSCRIPTION]->(s:Subscription)-[:SUBSCRIBED_TO]->(p:Plan)
OPTIONAL MATCH (a)-[:HAS_PAYMENT_METHOD]->(pm:PaymentMethod)
OPTIONAL MATCH (a)-[:WATCHED]->(pe:PlayEvent)-[:PLAYED]->(m:Media)
OPTIONAL MATCH (a)-[:SUBMITTED]->(t:Ticket)-[:TAGGED_WITH]->(tag:Tag)
RETURN a.email AS customer,
a.created_at AS signup_date,
p.plan_name AS plan,
s.status AS subscription_status,
pm.card_type + ' •••• ' + pm.card_last_four AS payment,
COUNT(DISTINCT m) AS content_watched,
COUNT(DISTINCT pe) AS total_streams,
COUNT(DISTINCT t) AS tickets,
COLLECT(DISTINCT tag.tag_name)[0..5] AS ticket_topics;
9. MOST ACTIVE STREAMERS
MATCH (a:Account)-[:WATCHED]->(pe:PlayEvent)-[:PLAYED]->(m:Media)
WITH a,
COUNT(DISTINCT pe) AS total_streams,
COUNT(DISTINCT m) AS unique_content,
COUNT(DISTINCT m.genre) AS genres_watched,
COLLECT(DISTINCT m.genre)[0..5] AS top_genres
ORDER BY total_streams DESC
LIMIT 10
RETURN a.email,
a.full_name,
total_streams,
unique_content,
genres_watched,
top_genres;
10. ACCOUNT ACTIVITY HEATMAP
MATCH (pe:PlayEvent)
WITH pe.event_timestamp.hour AS hour_of_day, COUNT(*) AS streams
ORDER BY hour_of_day
RETURN hour_of_day,
streams,
CASE
WHEN streams > 5 THEN 'Peak'
WHEN streams > 3 THEN 'High'
WHEN streams > 1 THEN 'Medium'
ELSE 'Low'
END AS activity_level
ORDER BY hour_of_day;
Setup PostgreSQL:
Create your four databases: etl_control_db, profiles_db, streaming_db, support_db.
Run all the _schema.sql and sample_data.sql scripts to create the tables and load the sample data.
Important: You must update the source_databases table inside etl_control_db with the correct db_password for your three source DBs.
Environment Setup:
Create a .env file in the root directory:
# Credentials for the main Control Database
ETL_DB_HOST=localhost
ETL_DB_PORT=5432
ETL_DB_NAME=etl_control_db
ETL_DB_USER=your_postgres_user
ETL_DB_PASSWORD=your_postgres_password
# Credentials for the target Neo4j Aura Database
NEO4J_URI=neo4a+s://your-aura-db-id.databases.neo4j.io
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD=your_neo4j_password
Install Python dependencies:
pip install psycopg2-binary neo4j python-dotenv
Run the Pipeline:
Execute the main script. This will:
Read all data from Postgres.
Generate a massive graph_output_[timestamp].cypher file.
Connect to Neo4j (clearing it first, if clear_database=True).
Execute the generated Cypher script to build the graph.
python main.py