-
Notifications
You must be signed in to change notification settings - Fork 14
Expand file tree
/
Copy path01-example-inspect.yaml
More file actions
81 lines (72 loc) · 2.91 KB
/
01-example-inspect.yaml
File metadata and controls
81 lines (72 loc) · 2.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# $schema: https://axual.github.io/ksml/latest/ksml-language-spec.json
# This example shows how to read from different streams and log all messages
# The streams section defines input, output and intermediate topics. These are referred to by name in pipelines.
# Alternatively, these definitions may be inlined in the respective parts of pipelines (see below XML example).
streams:
sensor_source_avro:
topic: ksml_sensordata_avro
keyType: string
valueType: avro:SensorData
offsetResetPolicy: latest
sensor_source_csv:
topic: ksml_sensordata_csv
keyType: string
valueType: csv:SensorData
sensor_source_json:
topic: ksml_sensordata_json
keyType: string
valueType: json
sensor_source_jsonschema:
topic: ksml_sensordata_jsonschema
keyType: string
valueType: jsonschema:SensorData
sensor_source_protobuf:
topic: ksml_sensordata_protobuf
keyType: string
valueType: protobuf:sensor_data
# This section defines (reusable) functions. Each function has a type associated with it, which allows KSML to
# automatically apply the minimal set of parameters and the correct return type to it. If no type is specified,
# then the type "general" is assumed.
functions:
# Log the message using the function's Java Logger, injected by KSML as the variable "log".
log_message:
type: forEach
# The parameter section _adds_ parameters to the type defined above. Therefore, the normal key" and "value"
# arguments of the "forEach" function type are still passed in implicitly.
parameters:
- name: format
type: string
code: log.info("Consumed {} message - key={}, value={}", format, key, value)
# The pipelines section defines named pipelines. Each pipeline starts by consuming from an input stream,
# table or globalTable and passing the data through a series of steps to a sink operation such as forEach.
pipelines:
# Every pipeline logs its own message, passing in the additional format parameter to log_message above
consume_avro:
from: sensor_source_avro
forEach:
code: log_message(key, value, format="AVRO")
consume_csv:
from: sensor_source_csv
forEach:
code: log_message(key, value, format="CSV")
consume_json:
from: sensor_source_json
forEach:
code: log_message(key, value, format="JSON")
consume_jsonschema:
from: sensor_source_jsonschema
forEach:
code: log_message(key, value, format="JSON Schema")
consume_protobuf:
from: sensor_source_protobuf
forEach:
code: log_message(key, value, format="PROTOBUF")
# This pipeline shows that you can also inline the input stream definition, instead of using the streams
# section above.
consume_xml:
from:
topic: ksml_sensordata_xml
keyType: string
valueType: xml:SensorData
forEach:
code: log_message(key, value, format="XML")