A production-minded ETL pipeline that demonstrates data engineering maturity with idempotent merges, field ownership enforcement, derived metrics, anomaly detection, and freshness tracking.
π Assignment Answers: See
docs/DOCUMENTATION.mdfor complete answers to all assignment questions (Steps 1-4), including analytical queries with explanations, schema design rationale, and assumptions.
This system implements a full ETL pipeline that:
- Loads CSV data into staging tables
- Applies idempotent merge with hash-based change detection
- Enforces source-of-truth / ownership per-field (EMR vs Zingage)
- Handles late-arriving data & backfills safely
- Materializes derived analytics without mutating source truth
- Flags anomalies instead of corrupting data
- Tracks freshness & time-decay SLAs
- Executes via orchestrated job graph
CSV Source β Staging DB β Deterministic Merge Jobs β Warehouse Tables β Derived Metrics β Exports/Dashboards
- Staging β Merge pattern: Never transform in-place
- Idempotency: Hash-based change detection, update-on-change only
- Ownership enforcement: EMR fields never overwritten by derived fields
- Race safety: Single-writer merge jobs
- Time-decay freshness: Actor-aware time budgets
- Anomaly logging: Log issues instead of corrupting data
/
βββ src/
β βββ etl/
β β βββ jobs/ # Individual ETL jobs
β β β βββ ingest_csv_staging.ts
β β β βββ merge_caregivers.ts
β β β βββ merge_visits.ts
β β β βββ compute_daily_hours.ts
β β β βββ anomaly_checks.ts
β β β βββ freshness_status.ts
β β βββ orchestrator.ts # ETL DAG orchestration
β βββ lib/
β βββ db.ts # Database connection
β βββ logger.ts # Logging utility
β βββ hash.ts # Hash generation for change detection
β βββ types.ts # TypeScript type definitions
βββ migrations/ # Knex database migrations
βββ tests/ # Test scripts
βββ data/ # CSV input files (create this directory)
βββ docker/ # Docker configuration
βββ sql/ # SQL initialization scripts
- Docker and Docker Compose
- Node.js 20+ (for local development)
- npm or yarn
-
Clone and install dependencies:
npm install
-
Start Docker services:
docker-compose up -d
This starts:
- PostgreSQL (port 5432)
- pgAdmin (port 5050)
- ETL container (for running jobs)
Wait for services to be healthy (~10-15 seconds). Verify with:
docker-compose ps
-
Run database migrations:
npm run migrate
This creates all necessary tables: staging, warehouse, derived, and observability tables.
-
Ingest CSV data:
Place your CSV files in the root directory or
data/directory:caregiver_data_20250415_sanitized.csvcarelog_data_20250415_sanitized.csv
-
Run the full ETL pipeline:
npm run etl:full
This executes the complete ETL DAG:
- CSV ingestion to staging
- Duplicate detection (caregivers & visits)
- Merge caregivers
- Merge visits (first pass)
- Resolve orphaned visits
- Re-merge visits (with resolutions)
- Compute daily hours
- Anomaly checks
- Freshness status update
-
Navigate to frontend directory:
cd frontend/frontend -
Install dependencies:
npm install
-
Start frontend dev server:
npm run dev
Frontend runs on
http://localhost:5173
-
From project root, start backend server:
npm run start:server
Backend runs on
http://localhost:4000 -
Configure frontend (if needed):
- Create
frontend/frontend/.envfile:VITE_API_BASE_URL=http://localhost:4000
- Create
The pipeline executes the following jobs in order:
- ingest_csv_staging: Loads CSV files into staging tables
- detect_duplicate_caregivers: Detects and removes duplicate caregivers (exact + semantic)
- detect_duplicate_visits: Detects and removes duplicate visits (exact + overlapping)
- merge_caregivers: Idempotent merge of caregivers with hash-based change detection
- merge_visits (first pass): Idempotent merge of visits with derived fields (duration, flags)
- resolve_orphaned_visits: Resolves orphaned visits using multiple strategies
- merge_visits (re-merge): Re-merges resolved visits with resolution metadata
- compute_daily_hours: Aggregates visit durations per caregiver per date
- anomaly_checks: Detects and logs data quality issues
- freshness_status: Updates freshness metadata and SLA status
Individual Jobs: npm run etl:ingest, etl:merge-caregivers, etl:merge-visits, etl:daily-hours, etl:anomalies, etl:freshness
Testing: npm run test:etl, npm run test:idempotency
Database: PostgreSQL localhost:5432, pgAdmin http://localhost:5050
For complete usage guide, see docs/SETUP.md.
Staging: caregivers_staging, visits_staging (temporary)
Warehouse: caregivers, visits (source of truth)
Derived: caregiver_daily_hours (aggregated metrics)
Observability: anomaly_log, freshness_metadata, visit_duplicates, caregiver_duplicates, orphaned_visit_resolutions, unresolved_anomalies
For complete schema details, see docs/DOCUMENTATION.md section "Database Schema Reference".
| Field Category | Owner |
|---|---|
| Caregiver demographics | EMR |
| Raw clock times | EMR |
| Derived durations & flags | Zingage ETL |
| Aggregations | Zingage ETL |
Rule: EMR fields are never overwritten by derived fields. Only new derived fields are added.
| SLA Type | Requirement |
|---|---|
| App UX | β€ 5β15 mins stale ok |
| Ops dashboard | hourly budget |
| Payroll | daily strict boundary |
Freshness status is tracked in freshness_metadata:
fresh: Data is within acceptable SLAstale: Data exceeds hourly budgetcritical: Data exceeds daily boundary
The testing approach is designed for gradual verification:
-
Test Full Pipeline (
test:etl)- Runs complete ETL pipeline
- Verifies record counts
- Shows sample queries
-
Test Idempotency (
test:idempotency)- Runs pipeline twice with same data
- Verifies no duplicates created
- Tests hash-based change detection
-
Manual Testing
- Run individual jobs
- Query database directly
- Check anomaly logs
- Monitor freshness status
CSV files should be placed in the root directory or data/ folder:
caregiver_data_20250415_sanitized.csv(or any file matchingcaregiver_data*.csv)carelog_data_20250415_sanitized.csv(or any file matchingcarelog_data*.csv)
Caregivers CSV should include:
caregiver_id,email,first_name,last_name,phone_number,status, etc.
Visits CSV should include:
carelog_id,caregiver_id,parent_id,start_datetime,end_datetime,clock_in_actual_datetime,clock_out_actual_datetime, etc.
Timestamps should be in ISO 8601 format (YYYY-MM-DD HH:MM:SS).
Note: The system automatically detects CSV files in the root directory or data/ folder.
Visit Completion: Scheduled time exists, clock-in/out both exist, duration > 5 minutes
Reliability: Missing clock-out, late start (>10 min), no-show
Documentation Quality: Comment β₯ 100 chars = detailed
Outliers: Negative or >24h duration, short duration (<5 min)
For complete analytical queries, see docs/DOCUMENTATION.md section "Analytical Queries Explained".
For troubleshooting help, see docs/SETUP.md section "Troubleshooting".
Create a .env file (use .env.example as template):
DB_HOST=localhost
DB_PORT=5432
DB_NAME=zingage_etl
DB_USER=postgres
DB_PASSWORD=postgres
CSV_DATA_DIR=./data
LOG_LEVEL=infodocs/DOCUMENTATION.md: Complete answers to all assignment questions (Steps 1-4), analytical queries with explanations, schema reference, anomaly detection & resolutiondocs/PIPELINE.md: Complete pipeline architecture, merge mechanics, design decisionsdocs/SETUP.md: Setup instructions, usage guide, troubleshooting
- β Frontend Integration: Frontend built and connected to backend API
- β API Layer: REST API created for SQL execution and ETL job triggering
- Scheduling: Add cron/job scheduler for automated runs
- Monitoring: Add Prometheus metrics and alerting
- Backfill Support: Enhanced handling of historical data loads
MIT