diff --git a/apis/fluentd/v1alpha1/plugins/output/es.go b/apis/fluentd/v1alpha1/plugins/output/es.go index 8a0e94d1f..521713235 100644 --- a/apis/fluentd/v1alpha1/plugins/output/es.go +++ b/apis/fluentd/v1alpha1/plugins/output/es.go @@ -66,6 +66,10 @@ type ElasticsearchCommon struct { IlmPolicyOverwrite *bool `json:"ilmPolicyOverride,omitempty"` // Optional, Enable logging of 400 reason without enabling debug log level LogEs400Reason *bool `json:"logEs400Reason,omitempty"` + // Optional, Configure bulk_message_request_threshold splitting threshold size. + // Default value is -1 (unlimited). + // If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + BulkMessageRequestThreshold *int32 `json:"bulkMessageRequestThreshold,omitempty"` } type Elasticsearch struct { diff --git a/apis/fluentd/v1alpha1/plugins/output/opensearch.go b/apis/fluentd/v1alpha1/plugins/output/opensearch.go index 69b5ea996..d2e1ab5aa 100644 --- a/apis/fluentd/v1alpha1/plugins/output/opensearch.go +++ b/apis/fluentd/v1alpha1/plugins/output/opensearch.go @@ -111,7 +111,7 @@ type Opensearch struct { IgnoreExceptions *string `json:"ignoreExceptions,omitempty"` // Optional, Backup chunk when ignore exception occurs (default: true) ExceptionBackup *bool `json:"exceptionBackup,omitempty"` - // Optional, Configure bulk_message request splitting threshold size (default: -1 unlimited) + // Optional, Configure bulk_message_request_threshold splitting threshold size (default: -1 unlimited) BulkMessageRequestThreshold *int32 `json:"bulkMessageRequestThreshold,omitempty"` // Optional, Specify the application name for the rollover index to be created (default: default) ApplicationName *string `json:"applicationName,omitempty"` diff --git a/apis/fluentd/v1alpha1/plugins/output/types.go b/apis/fluentd/v1alpha1/plugins/output/types.go index 74ab1df8c..dc80bb33c 100644 --- a/apis/fluentd/v1alpha1/plugins/output/types.go +++ b/apis/fluentd/v1alpha1/plugins/output/types.go @@ -396,6 +396,7 @@ func (o *Output) elasticsearchPluginCommon(cmn *ElasticsearchCommon, parent *par params.InsertPairs(parent, "ilm_policy", cmn.IlmPolicy) params.InsertPairs(parent, "ilm_policy_overwrite", cmn.IlmPolicyOverwrite) params.InsertPairs(parent, "log_es_400_reason", cmn.LogEs400Reason) + params.InsertPairs(parent, "bulk_message_request_threshold", cmn.BulkMessageRequestThreshold) return parent, nil } diff --git a/apis/fluentd/v1alpha1/tests/expected/fluentd-cluster-cfg-output-es-bulk-threshold.cfg b/apis/fluentd/v1alpha1/tests/expected/fluentd-cluster-cfg-output-es-bulk-threshold.cfg new file mode 100644 index 000000000..35ed11e13 --- /dev/null +++ b/apis/fluentd/v1alpha1/tests/expected/fluentd-cluster-cfg-output-es-bulk-threshold.cfg @@ -0,0 +1,26 @@ + + @type forward + bind 0.0.0.0 + port 24224 + + + @id main + @type label_router + + @label @a2170d34e9940ec56d328100e375c43e + + namespaces default,kube-system + + + + \ No newline at end of file diff --git a/apis/fluentd/v1alpha1/tests/helper_test.go b/apis/fluentd/v1alpha1/tests/helper_test.go index 6d8bd09ab..b4d9fa283 100644 --- a/apis/fluentd/v1alpha1/tests/helper_test.go +++ b/apis/fluentd/v1alpha1/tests/helper_test.go @@ -45,6 +45,11 @@ func Test_ClusterCfgOutput2ES(t *testing.T) { testClusterConfigWithFiltersAndOutputs(t, sl, Fluentd, &FluentdClusterFluentdConfig1, []fluentdv1alpha1.ClusterFilter{}, []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2ES}, "./expected/fluentd-cluster-cfg-output-es.cfg", false) } +func Test_ClusterCfgOutput2ESBulkThreshold(t *testing.T) { + sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) + testClusterConfigWithFiltersAndOutputs(t, sl, Fluentd, &FluentdClusterFluentdConfig1, []fluentdv1alpha1.ClusterFilter{}, []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2ESBulkThreshold}, "./expected/fluentd-cluster-cfg-output-es-bulk-threshold.cfg", false) +} + func Test_ClusterCfgOutput2ESDataStream(t *testing.T) { sl := plugins.NewSecretLoader(nil, Fluentd.Namespace, logr.Logger{}) testClusterConfigWithFiltersAndOutputs(t, sl, Fluentd, &FluentdClusterFluentdConfig1, []fluentdv1alpha1.ClusterFilter{}, []fluentdv1alpha1.ClusterOutput{FluentdclusterOutput2ESDataStream}, "./expected/fluentd-cluster-cfg-output-es-data-stream.cfg", false) diff --git a/apis/fluentd/v1alpha1/tests/tools.go b/apis/fluentd/v1alpha1/tests/tools.go index 75b9a24ff..ec805dcd0 100644 --- a/apis/fluentd/v1alpha1/tests/tools.go +++ b/apis/fluentd/v1alpha1/tests/tools.go @@ -367,6 +367,24 @@ spec: logstashPrefix: ks-logstash-log ` + FluentdclusterOutput2ESBulkThreshold fluentdv1alpha1.ClusterOutput + FluentdclusterOutput2ESBulkThresholdRaw = ` +apiVersion: fluentd.fluent.io/v1alpha1 +kind: ClusterOutput +metadata: + name: fluentd-output-es-bulk + labels: + output.fluentd.fluent.io/enabled: "true" +spec: + outputs: + - elasticsearch: + host: elasticsearch-logging-data.kubesphere-logging-system.svc + port: 9200 + logstashFormat: true + logstashPrefix: ks-logstash-log + bulkMessageRequestThreshold: 20000000 +` + FluentdclusterOutput2ESDataStream fluentdv1alpha1.ClusterOutput FluentdclusterOutput2ESDataStreamRaw = ` apiVersion: fluentd.fluent.io/v1alpha1 @@ -1226,6 +1244,7 @@ func init() { MustParseIntoObject(FluentdClusterOutputBufferRaw, &FluentdClusterOutputBuffer) MustParseIntoObject(FluentdClusterOutputMemoryBufferRaw, &FluentdClusterOutputMemoryBuffer) MustParseIntoObject(FluentdclusterOutput2ESRaw, &FluentdclusterOutput2ES) + MustParseIntoObject(FluentdclusterOutput2ESBulkThresholdRaw, &FluentdclusterOutput2ESBulkThreshold) MustParseIntoObject(FluentdclusterOutput2ESDataStreamRaw, &FluentdclusterOutput2ESDataStream) MustParseIntoObject(FluentdclusterOutput2CopyESDataStreamRaw, &FluentdclusterOutput2CopyESDataStream) MustParseIntoObject(FluentdOutput2ES1Raw, &FluentdOutput2ES1) diff --git a/charts/fluent-operator-fluentd-crds/templates/fluentd.fluent.io_clusteroutputs.yaml b/charts/fluent-operator-fluentd-crds/templates/fluentd.fluent.io_clusteroutputs.yaml index 3f6490dc5..fc7e82cad 100644 --- a/charts/fluent-operator-fluentd-crds/templates/fluentd.fluent.io_clusteroutputs.yaml +++ b/charts/fluent-operator-fluentd-crds/templates/fluentd.fluent.io_clusteroutputs.yaml @@ -573,6 +573,13 @@ spec: elasticsearch: description: out_es plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -869,6 +876,13 @@ spec: elasticsearchDataStream: description: out_es datastreams plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -2195,8 +2209,8 @@ spec: the rollover index to be created (default: default)' type: string bulkMessageRequestThreshold: - description: 'Optional, Configure bulk_message request splitting - threshold size (default: -1 unlimited)' + description: 'Optional, Configure bulk_message_request_threshold + splitting threshold size (default: -1 unlimited)' format: int32 type: integer caFile: diff --git a/charts/fluent-operator-fluentd-crds/templates/fluentd.fluent.io_outputs.yaml b/charts/fluent-operator-fluentd-crds/templates/fluentd.fluent.io_outputs.yaml index a08362479..f65e97ae2 100644 --- a/charts/fluent-operator-fluentd-crds/templates/fluentd.fluent.io_outputs.yaml +++ b/charts/fluent-operator-fluentd-crds/templates/fluentd.fluent.io_outputs.yaml @@ -573,6 +573,13 @@ spec: elasticsearch: description: out_es plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -869,6 +876,13 @@ spec: elasticsearchDataStream: description: out_es datastreams plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -2195,8 +2209,8 @@ spec: the rollover index to be created (default: default)' type: string bulkMessageRequestThreshold: - description: 'Optional, Configure bulk_message request splitting - threshold size (default: -1 unlimited)' + description: 'Optional, Configure bulk_message_request_threshold + splitting threshold size (default: -1 unlimited)' format: int32 type: integer caFile: diff --git a/charts/fluent-operator/crds/fluentd.fluent.io_clusteroutputs.yaml b/charts/fluent-operator/crds/fluentd.fluent.io_clusteroutputs.yaml index d6936259e..dcb77c818 100644 --- a/charts/fluent-operator/crds/fluentd.fluent.io_clusteroutputs.yaml +++ b/charts/fluent-operator/crds/fluentd.fluent.io_clusteroutputs.yaml @@ -571,6 +571,13 @@ spec: elasticsearch: description: out_es plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -867,6 +874,13 @@ spec: elasticsearchDataStream: description: out_es datastreams plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -2193,8 +2207,8 @@ spec: the rollover index to be created (default: default)' type: string bulkMessageRequestThreshold: - description: 'Optional, Configure bulk_message request splitting - threshold size (default: -1 unlimited)' + description: 'Optional, Configure bulk_message_request_threshold + splitting threshold size (default: -1 unlimited)' format: int32 type: integer caFile: diff --git a/charts/fluent-operator/crds/fluentd.fluent.io_outputs.yaml b/charts/fluent-operator/crds/fluentd.fluent.io_outputs.yaml index a63d2f6b2..80e280353 100644 --- a/charts/fluent-operator/crds/fluentd.fluent.io_outputs.yaml +++ b/charts/fluent-operator/crds/fluentd.fluent.io_outputs.yaml @@ -571,6 +571,13 @@ spec: elasticsearch: description: out_es plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -867,6 +874,13 @@ spec: elasticsearchDataStream: description: out_es datastreams plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -2193,8 +2207,8 @@ spec: the rollover index to be created (default: default)' type: string bulkMessageRequestThreshold: - description: 'Optional, Configure bulk_message request splitting - threshold size (default: -1 unlimited)' + description: 'Optional, Configure bulk_message_request_threshold + splitting threshold size (default: -1 unlimited)' format: int32 type: integer caFile: diff --git a/config/crd/bases/fluentd.fluent.io_clusteroutputs.yaml b/config/crd/bases/fluentd.fluent.io_clusteroutputs.yaml index 5d0e2ecb2..89c5cd6d8 100644 --- a/config/crd/bases/fluentd.fluent.io_clusteroutputs.yaml +++ b/config/crd/bases/fluentd.fluent.io_clusteroutputs.yaml @@ -572,6 +572,13 @@ spec: elasticsearch: description: out_es plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -868,6 +875,13 @@ spec: elasticsearchDataStream: description: out_es datastreams plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -2194,8 +2208,8 @@ spec: the rollover index to be created (default: default)' type: string bulkMessageRequestThreshold: - description: 'Optional, Configure bulk_message request splitting - threshold size (default: -1 unlimited)' + description: 'Optional, Configure bulk_message_request_threshold + splitting threshold size (default: -1 unlimited)' format: int32 type: integer caFile: diff --git a/config/crd/bases/fluentd.fluent.io_outputs.yaml b/config/crd/bases/fluentd.fluent.io_outputs.yaml index 73b4ac5b7..0e088484a 100644 --- a/config/crd/bases/fluentd.fluent.io_outputs.yaml +++ b/config/crd/bases/fluentd.fluent.io_outputs.yaml @@ -572,6 +572,13 @@ spec: elasticsearch: description: out_es plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -868,6 +875,13 @@ spec: elasticsearchDataStream: description: out_es datastreams plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -2194,8 +2208,8 @@ spec: the rollover index to be created (default: default)' type: string bulkMessageRequestThreshold: - description: 'Optional, Configure bulk_message request splitting - threshold size (default: -1 unlimited)' + description: 'Optional, Configure bulk_message_request_threshold + splitting threshold size (default: -1 unlimited)' format: int32 type: integer caFile: diff --git a/docs/plugins/fluentd/output/es.md b/docs/plugins/fluentd/output/es.md index f68ac583d..449e4b345 100644 --- a/docs/plugins/fluentd/output/es.md +++ b/docs/plugins/fluentd/output/es.md @@ -34,6 +34,7 @@ Elasticsearch defines the parameters for out_es output plugin | ilmPolicy | Optional, Specify ILM policy contents as Hash | *string | | ilmPolicyOverride | Optional, Specify whether overwriting ilm policy or not | *bool | | logEs400Reason | Optional, Enable logging of 400 reason without enabling debug log level | *bool | +| bulkMessageRequestThreshold | Optional, Configure bulk_message_request_threshold splitting threshold size. Default value is -1 (unlimited). If a bulk message exceeds this threshold, the request is split into multiple smaller requests. | *int32 | # Elasticsearch diff --git a/docs/plugins/fluentd/output/opensearch.md b/docs/plugins/fluentd/output/opensearch.md index 7700bdf76..9428e81fd 100644 --- a/docs/plugins/fluentd/output/opensearch.md +++ b/docs/plugins/fluentd/output/opensearch.md @@ -57,7 +57,7 @@ Opensearch defines the parameters for out_opensearch plugin | suppressDocWrap | Optional, Suppress doc_wrap (default: false) | *bool | | ignoreExceptions | Optional, List of exception classes to ignore | *string | | exceptionBackup | Optional, Backup chunk when ignore exception occurs (default: true) | *bool | -| bulkMessageRequestThreshold | Optional, Configure bulk_message request splitting threshold size (default: -1 unlimited) | *int32 | +| bulkMessageRequestThreshold | Optional, Configure bulk_message_request_threshold splitting threshold size (default: -1 unlimited) | *int32 | | applicationName | Optional, Specify the application name for the rollover index to be created (default: default) | *string | | indexDatePattern | Optional, Specify the index date pattern for creating a rollover index (default: now/d) | *string | | useLegacyTemplate | Optional, Use legacy template or not (default: false for composable templates) | *bool | diff --git a/manifests/setup/fluent-operator-crd.yaml b/manifests/setup/fluent-operator-crd.yaml index ae57f7756..fbc15cc7c 100644 --- a/manifests/setup/fluent-operator-crd.yaml +++ b/manifests/setup/fluent-operator-crd.yaml @@ -9186,6 +9186,13 @@ spec: elasticsearch: description: out_es plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -9482,6 +9489,13 @@ spec: elasticsearchDataStream: description: out_es datastreams plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -10808,8 +10822,8 @@ spec: the rollover index to be created (default: default)' type: string bulkMessageRequestThreshold: - description: 'Optional, Configure bulk_message request splitting - threshold size (default: -1 unlimited)' + description: 'Optional, Configure bulk_message_request_threshold + splitting threshold size (default: -1 unlimited)' format: int32 type: integer caFile: @@ -39259,6 +39273,13 @@ spec: elasticsearch: description: out_es plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -39555,6 +39576,13 @@ spec: elasticsearchDataStream: description: out_es datastreams plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -40881,8 +40909,8 @@ spec: the rollover index to be created (default: default)' type: string bulkMessageRequestThreshold: - description: 'Optional, Configure bulk_message request splitting - threshold size (default: -1 unlimited)' + description: 'Optional, Configure bulk_message_request_threshold + splitting threshold size (default: -1 unlimited)' format: int32 type: integer caFile: diff --git a/manifests/setup/setup.yaml b/manifests/setup/setup.yaml index 014d3e6f3..7286bb113 100644 --- a/manifests/setup/setup.yaml +++ b/manifests/setup/setup.yaml @@ -9186,6 +9186,13 @@ spec: elasticsearch: description: out_es plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -9482,6 +9489,13 @@ spec: elasticsearchDataStream: description: out_es datastreams plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -10808,8 +10822,8 @@ spec: the rollover index to be created (default: default)' type: string bulkMessageRequestThreshold: - description: 'Optional, Configure bulk_message request splitting - threshold size (default: -1 unlimited)' + description: 'Optional, Configure bulk_message_request_threshold + splitting threshold size (default: -1 unlimited)' format: int32 type: integer caFile: @@ -39259,6 +39273,13 @@ spec: elasticsearch: description: out_es plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -39555,6 +39576,13 @@ spec: elasticsearchDataStream: description: out_es datastreams plugin properties: + bulkMessageRequestThreshold: + description: |- + Optional, Configure bulk_message_request_threshold splitting threshold size. + Default value is -1 (unlimited). + If a bulk message exceeds this threshold, the request is split into multiple smaller requests. + format: int32 + type: integer caFile: description: Optional, Absolute path to CA certificate file type: string @@ -40881,8 +40909,8 @@ spec: the rollover index to be created (default: default)' type: string bulkMessageRequestThreshold: - description: 'Optional, Configure bulk_message request splitting - threshold size (default: -1 unlimited)' + description: 'Optional, Configure bulk_message_request_threshold + splitting threshold size (default: -1 unlimited)' format: int32 type: integer caFile: