Customer 360 Graph Pipeline
A recreation of a professional, NDA-bound project. This metadata-driven ETL pipeline unifies siloed PostgreSQL databases for advanced analytics.
Customer 360 Graph Pipeline
A metadata-driven ETL pipeline to unify siloed PostgreSQL databases into a Neo4j graph for advanced fraud detection and customer analysis.
Project Origin
This project is a recreation of one of my first professional projects from my early years as a software engineer. The original version, which is under a strict NDA, used a stack of MSSQL Server, Ontotext GraphDB (a semantic graph database), and Python, with a separate PostgreSQL database just for fetching configurations.
After the graph was populated, my team and I also built a REST API (using a Express Js) on top of it. This API served the aggregated graph data to our front-end applications, providing the rich, connected data that the clients needed.
To showcase this architecture in my portfolio, I’ve rebuilt it from memory using a substitute stack: PostgreSQL (replacing MSSQL) and Neo4j (replacing Ontotext).

1. The Core Problem: Siloed Data
In a typical enterprise, critical business data is “siloed” across different databases that cannot communicate. This project simulates that real-world problem:
-
profiles_db(Postgres): A modern microservice database for user accounts, subscriptions, and billing. -
streaming_db(Postgres): A high-volume analytics database for user streaming history (play events, IP addresses, devices). -
support_db(Postgres): A legacy support desk database for customer tickets.
This separation makes it “impossible” to answer critical, cross-functional questions in real-time. For example: “How many users who share a payment method with a ‘hacked’ account are also sharing an IP address with other users?“
2. The Solution: A Unified Graph
This pipeline solves the silo problem by extracting, transforming, and loading data from all three sources into a single, unified Neo4j graph.
By connecting data based on shared identifiers (like email, account_id, card_fingerprint), we build a “Customer 360” view that enables powerful analysis that was previously impossible.
3. Key Features
This isn’t just a simple script; it’s a robust, configurable pipeline.
-
100% Metadata-Driven: The entire ETL process is controlled by the
etl_control_db. There are no hardcoded table names, relationships, or credentials in the Python code. -
Dynamic Schema Mapping: You can add/remove tables, exclude sensitive columns (like
hashed_password), and define new node labels just by editing rows in the control database. -
Auditable & Logged: Every ETL run is logged in
etl_run_logs, and every table processed is tracked inetl_table_logs, giving you a complete audit trail. -
Database-First Design: The pipeline reads all configuration, rules, and mappings directly from the database, making it a “single source of truth.”
4. The etl_control_db Architecture
The etl_control_db is the “brains” of the entire pipeline. The Python script is stateless and reads all its instructions from this database.
Here is a breakdown of the key configuration tables:
-
source_databases:- Purpose: A registry of all source databases the ETL can connect to.
- How it’s used: The Python script queries this table at startup. For every database where
is_active = true, it will fetch the connection details and begin processing.
-
field_exclusion_rules:- Purpose: A dynamic “blocklist” to prevent sensitive or unnecessary data from entering the graph.
- How it’s used: Before querying a table (e.g.,
accounts), the script fetches all rules for that table. It dynamically builds itsSELECTstatement to only include columns whereis_excluded = false. This is how PII likehashed_passwordis filtered out.
-
node_label_mappings:- Purpose: Defines the translation from a SQL table name to a Neo4j node label.
- How it’s used: When the script processes the
accountstable, it looks it up here and finds it should create nodes with the label:Account.
-
relationship_mappings:- Purpose: This is the most critical table. It defines all the relationships to create after the nodes are loaded.
- How it’s used: The script loops through this table and uses the columns to generate the final Cypher
MATCH...WHERE...CREATEstatements. For example:from_label:Accountto_label:PaymentMethodrelationship_type:HAS_PAYMENT_METHODjoin_condition:a.account_id = pm.account_id
- This metadata-driven approach means you can add new relationships (even cross-database ones) without ever touching the Python code.
-
etl_run_logs&etl_table_logs:- Purpose: Provides a complete audit trail of the pipeline’s execution.
- How it’s used: The script creates one
etl_run_logsentry when it starts. It then creates anetl_table_logsentry for every table it’s about to process. As it completes each table, it updates the row withstatus,rows_processed, etc., giving you a granular, real-time view of the pipeline’s progress.
5. Pipeline Execution Flow (main.py)
The main.py script acts as the orchestrator, bringing all the components together. It operates in two distinct phases:
Phase 1: ETL (The PostgresToNeo4jETL Class)
This phase reads from all Postgres sources and generates a single, massive Cypher script.
- Init & Connect: The
main()function instantiatesETLConfig(from.env) and passes it to thePostgresToNeo4jETLclass. The class connects to theetl_control_db. - Start Log: It inserts a new row into
etl_run_logsto get alog_idfor this session. - Fetch Sources: It queries the
source_databasestable to get a list of all active databases to process. - Loop & Extract Nodes: It loops through each source database and then loops through each table in that database. For every table (e.g.,
profiles_db.accounts):- It queries
field_exclusion_rulesto dynamically build aSELECTstatement that omits columns likehashed_password. - It queries
node_label_mappingsto find the correct Neo4j label (e.g.,accounts->:Account). - It extracts all data from the table (e.g.,
SELECT account_id, email, full_name... FROM accounts). - It generates Cypher
CREATEstatements for every row and logs the task toetl_table_logs.
- It queries
- Generate Relationships: After all nodes from all sources are generated, it queries the
relationship_mappingstable. It uses this metadata to build the complex, cross-databaseMATCH...WHERE...CREATEstatements that connect the nodes. - Write File: It writes all generated Cypher statements (for nodes and relationships) into a single timestamped file (e.g.,
graph_output_20251102.cypher) and registers it in thecypher_scriptstable.
Phase 2: Load (The Neo4jLoader Class)
This phase takes the generated script and executes it against Neo4j.
- Init & Connect: The
main()function instantiates theNeo4jLoaderwith the Neo4j credentials from the.envfile. The class connects to your Neo4j Aura instance. - Clear Database (Optional): Because
clear_database=Trueis set inmain(), the loader first runsMATCH (n) DETACH DELETE nto wipe the graph clean. - Read & Execute: It reads the
graph_output.cypherfile (passed to it frommain()), splits its content into a list of individual Cypher statements, and executes them one by one. - Update Logs: Finally, it connects to the
etl_control_dbone last time to update thecypher_scriptstable with alast_run_timeto log the successful load.
6. Final Graph Schema
The pipeline automatically generates the following graph structure based on the rules in the control database:
Node Labels
-
(:Account) -
(:Plan) -
(:PaymentMethod) -
(:Subscription) -
(:Media) -
(:PlayEvent) -
(:Ticket) -
(:Tag) -
(:TicketTagLink)
Relationships
-
(:Account) -[:HAS_PAYMENT_METHOD]-> (:PaymentMethod) -
(:Account) -[:HAS_SUBSCRIPTION]-> (:Subscription) -
(:Subscription) -[:SUBSCRIBED_TO]-> (:Plan) -
(:PlayEvent) -[:PLAYED]-> (:Media) -
(:Ticket) -[:TAGGED_WITH]-> (:Tag) -
(:Account) -[:SUBMITTED]-> (:Ticket)(Linksprofiles_dbtosupport_db) -
(:Account) -[:WATCHED]-> (:PlayEvent)(Linksprofiles_dbtostreaming_db)
7. The “Payoff”: High-Impact Analytical Queries
With the unified graph, you can now run powerful analytical queries that would crash a SQL server. This list contains the 10 queries that were validated to return data with the loaded sample set.
IMPORTANT: These queries may use multiple MATCH and WITH clauses. You must copy and run each query as a single, complete block from the first MATCH to the final RETURN or LIMIT. Running them line by line will cause an “Incomplete query” error.
FRAUD & CUSTOMER 360 ANALYTICS
1. PAYMENT FRAUD: Multiple accounts using same credit card
MATCH (a1:Account)-[:HAS_PAYMENT_METHOD]->(pm1:PaymentMethod)
MATCH (a2:Account)-[:HAS_PAYMENT_METHOD]->(pm2:PaymentMethod)
WHERE a1.account_id < a2.account_id
AND pm1.card_fingerprint = pm2.card_fingerprint
RETURN a1.email AS account_1,
a2.email AS account_2,
pm1.card_fingerprint AS shared_card_fingerprint,
pm1.card_type AS card_type,
'***' + pm1.card_last_four AS card_ending
ORDER BY shared_card_fingerprint;
2. CUSTOMER LIFETIME VALUE: Complete profile
MATCH (a:Account)
OPTIONAL MATCH (a)-[:HAS_SUBSCRIPTION]->(s:Subscription)-[:SUBSCRIBED_TO]->(p:Plan)
OPTIONAL MATCH (a)-[:WATCHED]->(pe:PlayEvent)-[:PLAYED]->(m:Media)
OPTIONAL MATCH (a)-[:SUBMITTED]->(t:Ticket)
WITH a, s, p,
COUNT(DISTINCT pe) AS total_streams,
COUNT(DISTINCT m) AS unique_content,
COUNT(DISTINCT t) AS tickets,
duration.between(a.created_at, datetime()).months AS months_active
RETURN a.email,
a.full_name,
p.plan_name AS plan,
toFloat(p.monthly_fee) * months_active AS estimated_revenue,
total_streams,
unique_content,
tickets,
months_active,
CASE
WHEN total_streams > 20 THEN 'Power User'
WHEN total_streams > 10 THEN 'Regular User'
WHEN total_streams > 0 THEN 'Casual User'
ELSE 'At-Risk (No Activity)'
END AS segment
ORDER BY estimated_revenue DESC
LIMIT 20;
3. CHURN PREDICTION: Find at-risk customers
// NOTE: datetime() is hardcoded to '2025-11-03' to work with sample data
WITH datetime('2025-11-03T00:00:00') AS now
MATCH (a:Account)-[:HAS_SUBSCRIPTION]->(s:Subscription)
OPTIONAL MATCH (a)-[:WATCHED]->(pe:PlayEvent)
WHERE pe.event_timestamp > now - duration({days: 30})
OPTIONAL MATCH (a)-[:SUBMITTED]->(t:Ticket)
WHERE t.created_at > now - duration({days: 30})
WITH a, s,
COUNT(DISTINCT pe) AS recent_streams,
COUNT(DISTINCT t) AS recent_tickets,
duration.between(a.created_at, now).months AS age_months
WHERE s.status IN ['active', 'Active']
RETURN a.email,
recent_streams,
recent_tickets,
age_months,
CASE
WHEN recent_streams = 0 AND recent_tickets > 0 THEN 'HIGH RISK - Problems + No Usage'
WHEN recent_streams = 0 THEN 'HIGH RISK - No Activity'
WHEN recent_streams < 5 THEN 'MEDIUM RISK - Low Engagement'
ELSE 'LOW RISK - Active User'
END AS churn_risk
ORDER BY
recent_streams ASC,
recent_tickets DESC
LIMIT 20;
4. GENRE PREFERENCES: What each user loves
MATCH (a:Account)-[:WATCHED]->(pe:PlayEvent)-[:PLAYED]->(m:Media)
WITH a, m.genre AS genre, COUNT(*) AS watches
ORDER BY a.account_id, watches DESC
WITH a, COLLECT({genre: genre, count: watches})[0..3] AS top_genres
RETURN a.email,
a.full_name,
[g IN top_genres | g.genre + ' (' + g.count + ' plays)'] AS favorite_genres
LIMIT 20;
BUSINESS INTELLIGENCE
5. REVENUE BY PLAN
MATCH (p:Plan)<-[:SUBSCRIBED_TO]-(s:Subscription)<-[:HAS_SUBSCRIPTION]-(a:Account)
WHERE s.status IN ['active', 'Active']
OPTIONAL MATCH (a)-[:WATCHED]->(pe:PlayEvent)
WITH p,
COUNT(DISTINCT a) AS subscribers,
toFloat(p.monthly_fee) * COUNT(DISTINCT a) AS monthly_revenue,
COUNT(DISTINCT pe) AS total_streams
RETURN p.plan_name,
subscribers,
monthly_revenue,
ROUND(toFloat(total_streams) / subscribers, 1) AS avg_streams_per_user,
p.max_devices
ORDER BY monthly_revenue DESC;
6. MOST POPULAR CONTENT
MATCH (m:Media)<-[:PLAYED]-(pe:PlayEvent)
WITH m,
COUNT(*) AS total_events,
COUNT(CASE WHEN pe.action IN ['play', 'START'] THEN 1 END) AS starts,
COUNT(CASE WHEN pe.action IN ['stop', 'FINISH'] THEN 1 END) AS completions
WHERE starts > 0
RETURN m.title,
m.genre,
m.media_type,
m.release_date,
starts AS play_starts,
completions,
ROUND(100.0 * completions / starts, 1) AS completion_rate_pct
ORDER BY starts DESC
LIMIT 20;
7. DEVICE ANALYSIS
MATCH (pe:PlayEvent)<-[:WATCHED]-(a:Account)
WITH pe.device_fingerprint AS device,
COUNT(*) AS usage_count,
COLLECT(DISTINCT a.email) AS users
ORDER BY usage_count DESC
LIMIT 10
OPTIONAL MATCH (a2:Account {email: users[0]})-[:SUBMITTED]->(t:Ticket)-[:TAGGED_WITH]->(tag:Tag)
WHERE tag.tag_name IN ['Technical', 'Playback']
RETURN device,
usage_count,
SIZE(users) AS unique_users,
COUNT(DISTINCT t) AS technical_tickets
ORDER BY usage_count DESC;
ADVANCED NETWORK ANALYSIS
8. COMPLETE CUSTOMER JOURNEY
MATCH (a:Account {email: 'john.smith@email.com'})
OPTIONAL MATCH (a)-[:HAS_SUBSCRIPTION]->(s:Subscription)-[:SUBSCRIBED_TO]->(p:Plan)
OPTIONAL MATCH (a)-[:HAS_PAYMENT_METHOD]->(pm:PaymentMethod)
OPTIONAL MATCH (a)-[:WATCHED]->(pe:PlayEvent)-[:PLAYED]->(m:Media)
OPTIONAL MATCH (a)-[:SUBMITTED]->(t:Ticket)-[:TAGGED_WITH]->(tag:Tag)
RETURN a.email AS customer,
a.created_at AS signup_date,
p.plan_name AS plan,
s.status AS subscription_status,
pm.card_type + ' •••• ' + pm.card_last_four AS payment,
COUNT(DISTINCT m) AS content_watched,
COUNT(DISTINCT pe) AS total_streams,
COUNT(DISTINCT t) AS tickets,
COLLECT(DISTINCT tag.tag_name)[0..5] AS ticket_topics;
9. MOST ACTIVE STREAMERS
MATCH (a:Account)-[:WATCHED]->(pe:PlayEvent)-[:PLAYED]->(m:Media)
WITH a,
COUNT(DISTINCT pe) AS total_streams,
COUNT(DISTINCT m) AS unique_content,
COUNT(DISTINCT m.genre) AS genres_watched,
COLLECT(DISTINCT m.genre)[0..5] AS top_genres
ORDER BY total_streams DESC
LIMIT 10
RETURN a.email,
a.full_name,
total_streams,
unique_content,
genres_watched,
top_genres;
10. ACCOUNT ACTIVITY HEATMAP
MATCH (pe:PlayEvent)
WITH pe.event_timestamp.hour AS hour_of_day, COUNT(*) AS streams
ORDER BY hour_of_day
RETURN hour_of_day,
streams,
CASE
WHEN streams > 5 THEN 'Peak'
WHEN streams > 3 THEN 'High'
WHEN streams > 1 THEN 'Medium'
ELSE 'Low'
END AS activity_level
ORDER BY hour_of_day;
8. How to Run
-
Setup PostgreSQL:
-
Create your four databases:
etl_control_db,profiles_db,streaming_db,support_db. -
Run all the
_schema.sqlandsample_data.sqlscripts to create the tables and load the sample data. -
Important: You must update the
source_databasestable insideetl_control_dbwith the correctdb_passwordfor your three source DBs.
-
-
Environment Setup:
-
Create a
.envfile in the root directory:# Credentials for the main Control Database ETL_DB_HOST=localhost ETL_DB_PORT=5432 ETL_DB_NAME=etl_control_db ETL_DB_USER=your_postgres_user ETL_DB_PASSWORD=your_postgres_password # Credentials for the target Neo4j Aura Database NEO4J_URI=neo4a+s://your-aura-db-id.databases.neo4j.io NEO4J_USERNAME=neo4j NEO4J_PASSWORD=your_neo4j_password -
Install Python dependencies:
pip install psycopg2-binary neo4j python-dotenv
-
-
Run the Pipeline:
-
Execute the main script. This will:
-
Read all data from Postgres.
-
Generate a massive
graph_output_[timestamp].cypherfile. -
Connect to Neo4j (clearing it first, if
clear_database=True). -
Execute the generated Cypher script to build the graph.
-
-
python main.py
-