Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apis/fluentd/v1alpha1/plugins/output/es.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ type ElasticsearchCommon struct {
IlmPolicyOverwrite *bool `json:"ilmPolicyOverride,omitempty"`
// Optional, Enable logging of 400 reason without enabling debug log level
LogEs400Reason *bool `json:"logEs400Reason,omitempty"`
// Optional, Configure bulk_message_request_threshold splitting threshold size.
// Default value is -1 (unlimited).
// If a bulk message exceeds this threshold, the request is split into multiple smaller requests.
BulkMessageRequestThreshold *int32 `json:"bulkMessageRequestThreshold,omitempty"`
}

type Elasticsearch struct {
Expand Down
2 changes: 1 addition & 1 deletion apis/fluentd/v1alpha1/plugins/output/opensearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type Opensearch struct {
IgnoreExceptions *string `json:"ignoreExceptions,omitempty"`
// Optional, Backup chunk when ignore exception occurs (default: true)
ExceptionBackup *bool `json:"exceptionBackup,omitempty"`
// Optional, Configure bulk_message request splitting threshold size (default: -1 unlimited)
// Optional, Configure bulk_message_request_threshold splitting threshold size (default: -1 unlimited)
BulkMessageRequestThreshold *int32 `json:"bulkMessageRequestThreshold,omitempty"`
// Optional, Specify the application name for the rollover index to be created (default: default)
ApplicationName *string `json:"applicationName,omitempty"`
Expand Down
1 change: 1 addition & 0 deletions apis/fluentd/v1alpha1/plugins/output/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ func (o *Output) elasticsearchPluginCommon(cmn *ElasticsearchCommon, parent *par
params.InsertPairs(parent, "ilm_policy", cmn.IlmPolicy)
params.InsertPairs(parent, "ilm_policy_overwrite", cmn.IlmPolicyOverwrite)
params.InsertPairs(parent, "log_es_400_reason", cmn.LogEs400Reason)
params.InsertPairs(parent, "bulk_message_request_threshold", cmn.BulkMessageRequestThreshold)

Comment on lines 396 to 400
return parent, nil
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
<match **>
@id main
@type label_router
<route>
@label @a2170d34e9940ec56d328100e375c43e
<match>
namespaces default,kube-system
</match>
</route>
</match>
<label @a2170d34e9940ec56d328100e375c43e>
<match **>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-es-bulk-0
@type elasticsearch
bulk_message_request_threshold 20000000
host elasticsearch-logging-data.kubesphere-logging-system.svc
logstash_format true
logstash_prefix ks-logstash-log
port 9200
</match>
</label>
5 changes: 5 additions & 0 deletions apis/fluentd/v1alpha1/tests/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func Test_ClusterCfgOutput2ES(t *testing.T) {
testClusterConfigWithFiltersAndOutputs(t, sl, Fluentd, &FluentdClusterFluentdConfig1, []fluentdv1alpha1.ClusterFilter{}, []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2ES}, "./expected/fluentd-cluster-cfg-output-es.cfg", false)
}

func Test_ClusterCfgOutput2ESBulkThreshold(t *testing.T) {
sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{})
testClusterConfigWithFiltersAndOutputs(t, sl, Fluentd, &FluentdClusterFluentdConfig1, []fluentdv1alpha1.ClusterFilter{}, []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2ESBulkThreshold}, "./expected/fluentd-cluster-cfg-output-es-bulk-threshold.cfg", false)
}

func Test_ClusterCfgOutput2ESDataStream(t *testing.T) {
sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{})
testClusterConfigWithFiltersAndOutputs(t, sl, Fluentd, &FluentdClusterFluentdConfig1, []fluentdv1alpha1.ClusterFilter{}, []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2ESDataStream}, "./expected/fluentd-cluster-cfg-output-es-data-stream.cfg", false)
Expand Down
19 changes: 19 additions & 0 deletions apis/fluentd/v1alpha1/tests/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,24 @@ spec:
logstashPrefix: ks-logstash-log
`

FluentdclusterOutput2ESBulkThreshold fluentdv1alpha1.ClusterOutput
FluentdclusterOutput2ESBulkThresholdRaw = `
apiVersion: fluentd.fluent.io/v1alpha1
kind: ClusterOutput
metadata:
name: fluentd-output-es-bulk
labels:
output.fluentd.fluent.io/enabled: "true"
spec:
outputs:
- elasticsearch:
host: elasticsearch-logging-data.kubesphere-logging-system.svc
port: 9200
logstashFormat: true
logstashPrefix: ks-logstash-log
bulkMessageRequestThreshold: 20000000
`

FluentdclusterOutput2ESDataStream fluentdv1alpha1.ClusterOutput
FluentdclusterOutput2ESDataStreamRaw = `
apiVersion: fluentd.fluent.io/v1alpha1
Expand Down Expand Up @@ -1226,6 +1244,7 @@ func init() {
MustParseIntoObject(FluentdClusterOutputBufferRaw, &FluentdClusterOutputBuffer)
MustParseIntoObject(FluentdClusterOutputMemoryBufferRaw, &FluentdClusterOutputMemoryBuffer)
MustParseIntoObject(FluentdclusterOutput2ESRaw, &FluentdclusterOutput2ES)
MustParseIntoObject(FluentdclusterOutput2ESBulkThresholdRaw, &FluentdclusterOutput2ESBulkThreshold)
MustParseIntoObject(FluentdclusterOutput2ESDataStreamRaw, &FluentdclusterOutput2ESDataStream)
MustParseIntoObject(FluentdclusterOutput2CopyESDataStreamRaw, &FluentdclusterOutput2CopyESDataStream)
MustParseIntoObject(FluentdOutput2ES1Raw, &FluentdOutput2ES1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,13 @@ spec:
elasticsearch:
description: out_es plugin
properties:
bulkMessageRequestThreshold:
description: |-
Optional, Configure bulk_message_request_threshold splitting threshold size.
Default value is -1 (unlimited).
If a bulk message exceeds this threshold, the request is split into multiple smaller requests.
format: int32
type: integer
caFile:
description: Optional, Absolute path to CA certificate file
type: string
Expand Down Expand Up @@ -869,6 +876,13 @@ spec:
elasticsearchDataStream:
description: out_es datastreams plugin
properties:
bulkMessageRequestThreshold:
description: |-
Optional, Configure bulk_message_request_threshold splitting threshold size.
Default value is -1 (unlimited).
If a bulk message exceeds this threshold, the request is split into multiple smaller requests.
format: int32
type: integer
caFile:
description: Optional, Absolute path to CA certificate file
type: string
Expand Down Expand Up @@ -2195,8 +2209,8 @@ spec:
the rollover index to be created (default: default)'
type: string
bulkMessageRequestThreshold:
description: 'Optional, Configure bulk_message request splitting
threshold size (default: -1 unlimited)'
description: 'Optional, Configure bulk_message_request_threshold
splitting threshold size (default: -1 unlimited)'
format: int32
type: integer
caFile:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,13 @@ spec:
elasticsearch:
description: out_es plugin
properties:
bulkMessageRequestThreshold:
description: |-
Optional, Configure bulk_message_request_threshold splitting threshold size.
Default value is -1 (unlimited).
If a bulk message exceeds this threshold, the request is split into multiple smaller requests.
format: int32
type: integer
caFile:
description: Optional, Absolute path to CA certificate file
type: string
Expand Down Expand Up @@ -869,6 +876,13 @@ spec:
elasticsearchDataStream:
description: out_es datastreams plugin
properties:
bulkMessageRequestThreshold:
description: |-
Optional, Configure bulk_message_request_threshold splitting threshold size.
Default value is -1 (unlimited).
If a bulk message exceeds this threshold, the request is split into multiple smaller requests.
format: int32
type: integer
caFile:
description: Optional, Absolute path to CA certificate file
type: string
Expand Down Expand Up @@ -2195,8 +2209,8 @@ spec:
the rollover index to be created (default: default)'
type: string
bulkMessageRequestThreshold:
description: 'Optional, Configure bulk_message request splitting
threshold size (default: -1 unlimited)'
description: 'Optional, Configure bulk_message_request_threshold
splitting threshold size (default: -1 unlimited)'
format: int32
type: integer
caFile:
Expand Down
18 changes: 16 additions & 2 deletions charts/fluent-operator/crds/fluentd.fluent.io_clusteroutputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,13 @@ spec:
elasticsearch:
description: out_es plugin
properties:
bulkMessageRequestThreshold:
description: |-
Optional, Configure bulk_message_request_threshold splitting threshold size.
Default value is -1 (unlimited).
If a bulk message exceeds this threshold, the request is split into multiple smaller requests.
format: int32
type: integer
caFile:
description: Optional, Absolute path to CA certificate file
type: string
Expand Down Expand Up @@ -867,6 +874,13 @@ spec:
elasticsearchDataStream:
description: out_es datastreams plugin
properties:
bulkMessageRequestThreshold:
description: |-
Optional, Configure bulk_message_request_threshold splitting threshold size.
Default value is -1 (unlimited).
If a bulk message exceeds this threshold, the request is split into multiple smaller requests.
format: int32
type: integer
caFile:
description: Optional, Absolute path to CA certificate file
type: string
Expand Down Expand Up @@ -2193,8 +2207,8 @@ spec:
the rollover index to be created (default: default)'
type: string
bulkMessageRequestThreshold:
description: 'Optional, Configure bulk_message request splitting
threshold size (default: -1 unlimited)'
description: 'Optional, Configure bulk_message_request_threshold
splitting threshold size (default: -1 unlimited)'
format: int32
type: integer
caFile:
Expand Down
18 changes: 16 additions & 2 deletions charts/fluent-operator/crds/fluentd.fluent.io_outputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,13 @@ spec:
elasticsearch:
description: out_es plugin
properties:
bulkMessageRequestThreshold:
description: |-
Optional, Configure bulk_message_request_threshold splitting threshold size.
Default value is -1 (unlimited).
If a bulk message exceeds this threshold, the request is split into multiple smaller requests.
format: int32
type: integer
caFile:
description: Optional, Absolute path to CA certificate file
type: string
Expand Down Expand Up @@ -867,6 +874,13 @@ spec:
elasticsearchDataStream:
description: out_es datastreams plugin
properties:
bulkMessageRequestThreshold:
description: |-
Optional, Configure bulk_message_request_threshold splitting threshold size.
Default value is -1 (unlimited).
If a bulk message exceeds this threshold, the request is split into multiple smaller requests.
format: int32
type: integer
caFile:
description: Optional, Absolute path to CA certificate file
type: string
Expand Down Expand Up @@ -2193,8 +2207,8 @@ spec:
the rollover index to be created (default: default)'
type: string
bulkMessageRequestThreshold:
description: 'Optional, Configure bulk_message request splitting
threshold size (default: -1 unlimited)'
description: 'Optional, Configure bulk_message_request_threshold
splitting threshold size (default: -1 unlimited)'
format: int32
type: integer
caFile:
Expand Down
18 changes: 16 additions & 2 deletions config/crd/bases/fluentd.fluent.io_clusteroutputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,13 @@ spec:
elasticsearch:
description: out_es plugin
properties:
bulkMessageRequestThreshold:
description: |-
Optional, Configure bulk_message_request_threshold splitting threshold size.
Default value is -1 (unlimited).
If a bulk message exceeds this threshold, the request is split into multiple smaller requests.
format: int32
type: integer
caFile:
description: Optional, Absolute path to CA certificate file
type: string
Expand Down Expand Up @@ -868,6 +875,13 @@ spec:
elasticsearchDataStream:
description: out_es datastreams plugin
properties:
bulkMessageRequestThreshold:
description: |-
Optional, Configure bulk_message_request_threshold splitting threshold size.
Default value is -1 (unlimited).
If a bulk message exceeds this threshold, the request is split into multiple smaller requests.
format: int32
type: integer
caFile:
description: Optional, Absolute path to CA certificate file
type: string
Expand Down Expand Up @@ -2194,8 +2208,8 @@ spec:
the rollover index to be created (default: default)'
type: string
bulkMessageRequestThreshold:
description: 'Optional, Configure bulk_message request splitting
threshold size (default: -1 unlimited)'
description: 'Optional, Configure bulk_message_request_threshold
splitting threshold size (default: -1 unlimited)'
format: int32
type: integer
caFile:
Expand Down
18 changes: 16 additions & 2 deletions config/crd/bases/fluentd.fluent.io_outputs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,13 @@ spec:
elasticsearch:
description: out_es plugin
properties:
bulkMessageRequestThreshold:
description: |-
Optional, Configure bulk_message_request_threshold splitting threshold size.
Default value is -1 (unlimited).
If a bulk message exceeds this threshold, the request is split into multiple smaller requests.
format: int32
type: integer
caFile:
description: Optional, Absolute path to CA certificate file
type: string
Expand Down Expand Up @@ -868,6 +875,13 @@ spec:
elasticsearchDataStream:
description: out_es datastreams plugin
properties:
bulkMessageRequestThreshold:
description: |-
Optional, Configure bulk_message_request_threshold splitting threshold size.
Default value is -1 (unlimited).
If a bulk message exceeds this threshold, the request is split into multiple smaller requests.
format: int32
type: integer
caFile:
description: Optional, Absolute path to CA certificate file
type: string
Expand Down Expand Up @@ -2194,8 +2208,8 @@ spec:
the rollover index to be created (default: default)'
type: string
bulkMessageRequestThreshold:
description: 'Optional, Configure bulk_message request splitting
threshold size (default: -1 unlimited)'
description: 'Optional, Configure bulk_message_request_threshold
splitting threshold size (default: -1 unlimited)'
format: int32
type: integer
caFile:
Expand Down
1 change: 1 addition & 0 deletions docs/plugins/fluentd/output/es.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Elasticsearch defines the parameters for out_es output plugin
| ilmPolicy | Optional, Specify ILM policy contents as Hash | *string |
| ilmPolicyOverride | Optional, Specify whether overwriting ilm policy or not | *bool |
| logEs400Reason | Optional, Enable logging of 400 reason without enabling debug log level | *bool |
| bulkMessageRequestThreshold | Optional, Configure bulk_message request splitting threshold size. Default value is -1 (unlimited). If a bulk message exceeds this threshold, the request is split into multiple smaller requests. | *int32 |
# Elasticsearch


Expand Down
Loading