-
Notifications
You must be signed in to change notification settings - Fork 71
Expand file tree
/
Copy pathETLPipeline.yaml
More file actions
69 lines (63 loc) · 2.12 KB
/
ETLPipeline.yaml
File metadata and controls
69 lines (63 loc) · 2.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# ETL Pipeline example - comprehensive multi-step chain
chains:
- name: "ETL Pipeline"
schedule: "0 2 * * *" # Daily at 2 AM
live: true
max_instances: 1
timeout: 3600000 # 1 hour timeout
tasks:
- name: "Extract sales data"
kind: "SQL"
command: |
CREATE TEMP TABLE temp_sales_extract AS
SELECT * FROM sales_raw
WHERE created_date >= CURRENT_DATE - INTERVAL '1 day'
- name: "Validate extracted data"
kind: "SQL"
command: |
DO $$
DECLARE
row_count INTEGER;
BEGIN
SELECT COUNT(*) INTO row_count FROM temp_sales_extract;
IF row_count = 0 THEN
RAISE EXCEPTION 'No data extracted for processing';
END IF;
RAISE NOTICE 'Extracted % rows for processing', row_count;
END $$
- name: "Transform data"
kind: "SQL"
command: |
CREATE TEMP TABLE temp_sales_transformed AS
SELECT
sales_id,
customer_id,
UPPER(TRIM(customer_name)) as customer_name,
product_id,
ROUND(amount, 2) as amount,
created_date
FROM temp_sales_extract
WHERE amount > 0
autonomous: true
- name: "Load to data warehouse"
kind: "SQL"
command: |
INSERT INTO sales_warehouse
(sales_id, customer_id, customer_name, product_id, amount, processed_date)
SELECT
sales_id, customer_id, customer_name, product_id, amount, CURRENT_DATE
FROM temp_sales_transformed
ON CONFLICT (sales_id) DO UPDATE SET
amount = EXCLUDED.amount,
processed_date = EXCLUDED.processed_date
- name: "Update processing log"
kind: "SQL"
command: |
INSERT INTO etl_log (process_name, process_date, rows_processed, status)
SELECT
'ETL Pipeline',
CURRENT_DATE,
COUNT(*),
'SUCCESS'
FROM temp_sales_transformed
ignore_error: false