-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy path13-example-join.yaml
More file actions
111 lines (100 loc) · 3.47 KB
/
13-example-join.yaml
File metadata and controls
111 lines (100 loc) · 3.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# $schema: https://axual.github.io/ksml/latest/ksml-language-spec.json
# This example shows how to read sensor data from an Avro stream, join with the alert settings stored in a table and
# produce alerts on a topic for the sensor data that should trigger alerts
streams:
sensor_source:
topic: ksml_sensordata_avro
keyType: string
valueType: avro:SensorData
offsetResetPolicy: latest
sensor_transformed:
topic: ksml_sensordata_transformed
keyType: string
valueType: avro:SensorData
sensor_alerts:
topic: ksml_sensoralert
keyType: json
valueType: json
tables:
sensor_alert_settings:
topic: ksml_sensoralert_settings
keyType: string
valueType: avro:SensorAlertSettings
functions:
alert_join:
type: valueJoiner
code: |
log.debug('JOINING\n\t value1={}\n\t value2={}', value1, value2)
sensordata=value1
del sensordata["@type"]
del sensordata["@schema"]
triggeredAlertSettings=[]
if value2 is not None and value2["alertSettings"] is not None:
for setting in value2["alertSettings"]:
if setting["type"] == sensordata["type"] and setting["unit"] == sensordata["unit"] and ( (setting["alertBelow"] is not None and setting["alertBelow"] > sensordata["value"]) or (setting["alertAbove"] is not None and setting["alertAbove"] < sensordata["value"]) ):
log.info('Triggered alert {}, type={}, unit={}, value={}, alertAbove={}, alertBelow={}', setting["name"], setting["type"], setting["unit"], sensordata["value"], setting["alertAbove"], setting["alertBelow"])
triggeredAlertSettings.append(setting)
new_value={
"sensordata" : sensordata,
"alerts" : triggeredAlertSettings
}
log.debug('JOINED sensordata= {} alerts= {}', new_value["sensordata"], new_value["alerts"])
expression: new_value
resultType: json
alert_split:
type: keyValueToKeyValueListTransformer
code: |
newRecords=[]
if value is not None and len(value["alerts"]) > 0 :
sensordata = value["sensordata"]
new_key={
"name" : sensordata["name"],
"type" : sensordata["type"],
"city" : sensordata["city"]
}
for alert in value["alerts"]:
new_value={
"alert" : alert,
"sensordata" : sensordata
}
newRecords.append([new_key,new_value])
log.debug("Returning {} records", len(newRecords))
expression: newRecords
resultType: list(tuple(json,json))
pipelines:
prepare:
from: sensor_source
via:
- name: filter_nulls
type: filter
if:
expression: value is not None
- name: rekey_sensordata
type: transformKeyValue
mapper:
resultType: tuple(string,avro:SensorData)
expression: (value["city"],value)
to: sensor_transformed
join:
from: sensor_transformed
via:
- name: alert_joining
type: join
table: sensor_alert_settings
valueJoiner: alert_join
- name: log_join_result
type: peek
forEach:
code: log.debug("JOIN RESULT - key={}, value={}", key, value)
- name: expand_alerts
type: transformKeyValueToKeyValueList
mapper: alert_split
- name: log_alert
type: peek
forEach:
code: log.info("ALERT - key={}, value={}", key, value)
to: sensor_alerts
verify:
from: sensor_alerts
forEach:
code: log.debug("OUTPUT - key={}, value={}", key, value)