-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy path02-example-copy.yaml
More file actions
39 lines (35 loc) · 1.21 KB
/
02-example-copy.yaml
File metadata and controls
39 lines (35 loc) · 1.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# $schema: https://axual.github.io/ksml/latest/ksml-language-spec.json
# This example shows how to read from a simple stream and copy every message to a target topic.
streams:
sensor_source:
topic: ksml_sensordata_avro
keyType: string
valueType: avro:SensorData
offsetResetPolicy: latest
sensor_copy:
topic: ksml_sensordata_copy
keyType: string
valueType: avro:SensorData
pipelines:
# The main pipeline reads messages, outputs them and saves a copy in a target topic
main_copy_pipeline:
from: sensor_source
via:
- type: peek
forEach:
code: log.info("MAIN PIPELINE - key={}, value={}", key, value)
to: sensor_copy
# The secondary pipeline does exactly the same, but by inlining the source and target topics instead of referencing
# an earlier definition
secondary_copy_pipeline:
from:
topic: ksml_sensordata_avro
keyType: string
valueType: avro:SensorData
via:
- type: peek
forEach:
code: log.info("SECONDARY PIPELINE - key={}, value={}", key, value)
to:
topic: ksml_sensordata_copy
# No need to explicitly reference key/value types, since they are derived from previous steps in the pipeline