From 7016ee6e1b9ec4023dd1b706baf18a6ac0b12309 Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Mon, 22 Jun 2026 12:17:41 +0300 Subject: [PATCH] vmcluster: support storage node pools --- api/operator/v1beta1/vmcluster_types.go | 201 +++++++--- api/operator/v1beta1/vmextra_types.go | 56 +++ api/operator/v1beta1/vmextra_types_test.go | 140 +++++++ api/operator/v1beta1/zz_generated.deepcopy.go | 32 ++ config/crd/overlay/crd.descriptionless.yaml | 21 + config/crd/overlay/crd.yaml | 48 +++ docs/CHANGELOG.md | 8 +- docs/api.md | 19 +- docs/resources/vmcluster.md | 116 ++++++ .../operator/factory/build/build.go | 66 --- .../operator/factory/build/build_test.go | 147 ------- .../operator/factory/build/cluster.go | 26 +- .../operator/factory/vmauth/vmusers_config.go | 2 +- .../operator/factory/vmcluster/vmcluster.go | 375 ++++++++++++------ .../factory/vmcluster/vmcluster_pools.go | 173 ++++++++ .../factory/vmcluster/vmcluster_test.go | 160 ++++++++ .../operator/factory/vmdistributed/util.go | 4 +- internal/converter/converter.go | 3 +- 18 files changed, 1202 insertions(+), 395 deletions(-) create mode 100644 internal/controller/operator/factory/vmcluster/vmcluster_pools.go diff --git a/api/operator/v1beta1/vmcluster_types.go b/api/operator/v1beta1/vmcluster_types.go index e72a19374..b21416885 100644 --- a/api/operator/v1beta1/vmcluster_types.go +++ b/api/operator/v1beta1/vmcluster_types.go @@ -94,6 +94,18 @@ type VMClusterSpec struct { // See https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#automatic-vmstorage-discovery // +optional Discovery *VMClusterDiscovery `json:"discovery,omitempty"` + + // Pools defines named groups of vmstorage (and optionally vminsert) components. + // Each pool gets its own StatefulSet and headless Service named --. + // Top-level vmstorage and vminsert specs act as defaults; pool specs override them field-by-field. + // vmselect queries all pools using the pool name as a storage group name (-storageNode=/). + // When pools are defined the top-level vmstorage is not deployed; pools replace it entirely. + // The top-level vminsert is deployed as a shared insert across all pools only when no pool + // defines its own vminsert; as soon as any pool has a dedicated vminsert the top-level one is skipped. + // +optional + // +listType=map + // +listMapKey=name + Pools []VMClusterPool `json:"pools,omitempty"` } // VMClusterDiscovery configures automatic vmstorage node discovery for vminsert and vmselect. @@ -128,7 +140,16 @@ func (d *VMClusterDiscovery) enabled() bool { return d != nil && d.Enabled } -func (d *VMClusterDiscovery) validate() error { +func (d *VMClusterDiscovery) validate(license *License) error { + if !d.enabled() { + return nil + } + if !license.IsProvided() { + return fmt.Errorf("discovery requires a valid license key, see https://docs.victoriametrics.com/victoriametrics/enterprise/") + } + if err := license.validate(); err != nil { + return err + } if len(d.Filter) > 0 { if _, err := regexp.Compile(d.Filter); err != nil { return fmt.Errorf("discovery.filter is not a valid regexp: %w", err) @@ -488,6 +509,11 @@ type VMStorage struct { // it can be overwritten with component specific image.tag value. // +optional ComponentVersion string `json:"componentVersion,omitempty"` + // RetentionPeriod overrides the cluster-level retentionPeriod for this storage instance. + // Useful when using Pools to implement multi-retention setups. + // +optional + // +kubebuilder:validation:Pattern:="^[0-9]+(h|d|w|y)?$" + RetentionPeriod string `json:"retentionPeriod,omitempty"` // PodMetadata configures Labels and Annotations which are propagated to the VMStorage pods. PodMetadata *EmbeddedObjectMetadata `json:"podMetadata,omitempty"` // LogFormat for VMStorage to be configured with. @@ -707,6 +733,54 @@ func (cr *VMCluster) GetRemoteWriteURL() string { return fmt.Sprintf("%s%s", insertURL, BuildPathWithPrefixFlag(cr.Spec.VMInsert.ExtraArgs, "/insert/multitenant/prometheus/api/v1/write")) } +func (vms *VMStorage) validate(license *License, clusterRetentionPeriod string) error { + if vms.VMBackup != nil { + if err := vms.VMBackup.validate(license); err != nil { + return err + } + } + retention := clusterRetentionPeriod + if vms.RetentionPeriod != "" { + retention = vms.RetentionPeriod + } + if err := vms.RetentionFilters.validate(license, retention); err != nil { + return err + } + if vms.HPA != nil { + if vms.HPA.Behaviour != nil && vms.HPA.Behaviour.ScaleDown != nil { + return fmt.Errorf("scaledown HPA behavior is not supported") + } + if err := vms.HPA.Validate(); err != nil { + return err + } + } + if vms.VPA != nil { + if err := vms.VPA.Validate(); err != nil { + return err + } + } + if vms.RollingUpdateStrategyBehavior != nil { + if err := vms.RollingUpdateStrategyBehavior.Validate(); err != nil { + return err + } + } + return vms.Validate() +} + +func (vmi *VMInsert) validate() error { + if vmi.HPA != nil { + if err := vmi.HPA.Validate(); err != nil { + return err + } + } + if vmi.VPA != nil { + if err := vmi.VPA.Validate(); err != nil { + return err + } + } + return vmi.Validate() +} + func (cr *VMCluster) Validate() error { if MustSkipCRValidation(cr) { return nil @@ -742,17 +816,7 @@ func (cr *VMCluster) Validate() error { if vmi.ServiceSpec != nil && vmi.ServiceSpec.Name == name { return fmt.Errorf(".serviceSpec.Name cannot be equal to prefixed name=%q", name) } - if vmi.HPA != nil { - if err := vmi.HPA.Validate(); err != nil { - return err - } - } - if vmi.VPA != nil { - if err := vmi.VPA.Validate(); err != nil { - return err - } - } - if err := vmi.Validate(); err != nil { + if err := vmi.validate(); err != nil { return fmt.Errorf("vminsert: %w", err) } } @@ -765,28 +829,7 @@ func (cr *VMCluster) Validate() error { if vms.ServiceSpec != nil && vms.ServiceSpec.Name == name { return fmt.Errorf(".serviceSpec.Name cannot be equal to prefixed name=%q", name) } - if cr.Spec.VMStorage.VMBackup != nil { - if err := cr.Spec.VMStorage.VMBackup.validate(cr.Spec.License); err != nil { - return err - } - } - if err := vms.RetentionFilters.validate(cr.Spec.License, cr.Spec.RetentionPeriod); err != nil { - return err - } - if vms.RollingUpdateStrategyBehavior != nil { - if err := vms.RollingUpdateStrategyBehavior.Validate(); err != nil { - return fmt.Errorf("vmstorage: %w", err) - } - } - if vms.HPA != nil && vms.HPA.Behaviour != nil && vms.HPA.Behaviour.ScaleDown != nil { - return fmt.Errorf("vmstorage scaledown HPA behavior is not supported") - } - if vms.VPA != nil { - if err := vms.VPA.Validate(); err != nil { - return err - } - } - if err := vms.Validate(); err != nil { + if err := vms.validate(cr.Spec.License, cr.Spec.RetentionPeriod); err != nil { return fmt.Errorf("vmstorage: %w", err) } } @@ -807,34 +850,66 @@ func (cr *VMCluster) Validate() error { if cr.Spec.VMSelect != nil { vmselectDiscovery = cr.Spec.VMSelect.Discovery.OrDefault(cr.Spec.Discovery) } - if vminsertDiscovery.enabled() || vmselectDiscovery.enabled() { - if !cr.Spec.License.IsProvided() { - return fmt.Errorf("discovery requires a valid license key, see https://docs.victoriametrics.com/victoriametrics/enterprise/") - } - if err := cr.Spec.License.validate(); err != nil { - return err - } - } if vminsertDiscovery.enabled() { if cr.Spec.VMStorage != nil && len(cr.Spec.VMStorage.MaintenanceInsertNodeIDs) > 0 { return fmt.Errorf("maintenanceInsertNodeIDs cannot be used when vminsert discovery is enabled") } - if err := vminsertDiscovery.validate(); err != nil { - return fmt.Errorf("vminsert: %w", err) - } + } + if err := vminsertDiscovery.validate(cr.Spec.License); err != nil { + return fmt.Errorf("vminsert: %w", err) } if vmselectDiscovery.enabled() { if cr.Spec.VMStorage != nil && len(cr.Spec.VMStorage.MaintenanceSelectNodeIDs) > 0 { return fmt.Errorf("maintenanceSelectNodeIDs cannot be used when vmselect discovery is enabled") } - if err := vmselectDiscovery.validate(); err != nil { - return fmt.Errorf("vmselect: %w", err) + } + if err := vmselectDiscovery.validate(cr.Spec.License); err != nil { + return fmt.Errorf("vmselect: %w", err) + } + + poolNames := make(map[string]struct{}, len(cr.Spec.Pools)) + for i, pool := range cr.Spec.Pools { + if !poolNameRe.MatchString(pool.Name) { + return fmt.Errorf("pools[%d].name %q is invalid: must match ^[a-z0-9]([a-z0-9-]*[a-z0-9])?$", i, pool.Name) + } + if _, dup := poolNames[pool.Name]; dup { + return fmt.Errorf("pools[%d].name %q is duplicated", i, pool.Name) + } + poolNames[pool.Name] = struct{}{} + if pool.VMStorage != nil { + vms := pool.VMStorage.DeepCopy() + if cr.Spec.VMStorage != nil { + if err := MergeDeep(vms, cr.Spec.VMStorage, true); err != nil { + return fmt.Errorf("pools[%d] vmstorage merge: %w", i, err) + } + } + if err := vms.validate(cr.Spec.License, cr.Spec.RetentionPeriod); err != nil { + return fmt.Errorf("pools[%d] vmstorage: %w", i, err) + } + } + if pool.VMInsert != nil { + vmi := pool.VMInsert.DeepCopy() + if cr.Spec.VMInsert != nil { + if err := MergeDeep(vmi, cr.Spec.VMInsert, true); err != nil { + return fmt.Errorf("pools[%d] vminsert merge: %w", i, err) + } + } + if err := vmi.validate(); err != nil { + return fmt.Errorf("pools[%d] vminsert: %w", i, err) + } + poolDiscovery := vmi.Discovery.OrDefault(cr.Spec.Discovery) + if err := poolDiscovery.validate(cr.Spec.License); err != nil { + return fmt.Errorf("pools[%d] vminsert: %w", i, err) + } } } return nil } +// poolNameRe validates pool names: lowercase alphanumeric with interior hyphens. +var poolNameRe = regexp.MustCompile(`^[a-z0-9]([a-z0-9-]*[a-z0-9])?$`) + // AvailableStorageNodeIDs returns ids of the storage nodes for the provided component func (cr *VMCluster) AvailableStorageNodeIDs(kind ClusterComponent) []int32 { var result []int32 @@ -1184,3 +1259,35 @@ func (cr *VMAuthLoadBalancerSpec) UseTLS() bool { func (cr *VMAuthLoadBalancerSpec) GetMetricsPath() string { return BuildPathWithPrefixFlag(cr.ExtraArgs, metricsPath) } + +// VMClusterPool defines a named group of vmstorage (and optionally vminsert) components +// within a VMCluster. Each pool gets its own StatefulSet and headless Service. +// +k8s:openapi-gen=true +type VMClusterPool struct { + // Name is the unique identifier for this pool within the cluster. + // Used as a suffix for generated resource names and as a storage group name in vmselect. + // Must be a lowercase alphanumeric DNS label; hyphens allowed in the interior. + // +kubebuilder:validation:Pattern:="^[a-z0-9]([a-z0-9-]*[a-z0-9])?$" + // +kubebuilder:validation:MaxLength=32 + Name string `json:"name"` + // VMStorage defines pool-specific vmstorage configuration. + // Each field overrides the corresponding field in the top-level vmstorage spec. + // Fields absent here inherit from the top-level vmstorage. + // RetentionPeriod on VMStorage overrides the cluster-level retentionPeriod for this pool. + // +optional + // +kubebuilder:validation:Schemaless + // +kubebuilder:pruning:PreserveUnknownFields + VMStorage *VMStorage `json:"vmstorage,omitempty"` + // VMInsert defines a dedicated vminsert for this pool. + // Each field overrides the corresponding field in the top-level vminsert spec. + // When nil, the top-level shared vminsert writes to this pool's storage nodes as well. + // +optional + // +kubebuilder:validation:Schemaless + // +kubebuilder:pruning:PreserveUnknownFields + VMInsert *VMInsert `json:"vminsert,omitempty"` +} + +// PoolPrefixedName returns the Kubernetes resource name for the given component in a pool. +func (cr *VMCluster) PoolPrefixedName(kind ClusterComponent, poolName string) string { + return ClusterPrefixedName(kind, cr.Name, "vm", false) + "-" + poolName +} diff --git a/api/operator/v1beta1/vmextra_types.go b/api/operator/v1beta1/vmextra_types.go index 2b84efb84..7a96ad9bb 100644 --- a/api/operator/v1beta1/vmextra_types.go +++ b/api/operator/v1beta1/vmextra_types.go @@ -1763,3 +1763,59 @@ func (bs *BytesString) String() string { } return string(*bs) } + +func mergeMapsRecursive(baseMap, overrideMap map[string]any) { + if len(overrideMap) == 0 { + return + } + for key, overrideValue := range overrideMap { + if baseVal, ok := baseMap[key]; ok { + if baseMapNested, isBaseMap := baseVal.(map[string]any); isBaseMap { + if overrideMapNested, isOverrideMap := overrideValue.(map[string]any); isOverrideMap { + mergeMapsRecursive(baseMapNested, overrideMapNested) + continue + } + } + } + baseMap[key] = overrideValue + } +} + +// MergeDeep merges an override object into a base one using JSON round-trip. +// Fields present in the override overwrite corresponding fields in the base. +// When reverse is true the roles are swapped: base fills absent fields in override +// (useful when the override should win and the base provides defaults). +func MergeDeep[T comparable](base, override T, reverse bool) error { + var zero T + if override == zero { + return nil + } + baseJSON, err := json.Marshal(base) + if err != nil { + return fmt.Errorf("failed to marshal base spec: %w", err) + } + overrideJSON, err := json.Marshal(override) + if err != nil { + return fmt.Errorf("failed to marshal override spec: %w", err) + } + var baseMap map[string]any + if err := json.Unmarshal(baseJSON, &baseMap); err != nil { + return fmt.Errorf("failed to unmarshal base spec to map: %w", err) + } + var overrideMap map[string]any + if err := json.Unmarshal(overrideJSON, &overrideMap); err != nil { + return fmt.Errorf("failed to unmarshal override spec to map: %w", err) + } + if reverse { + baseMap, overrideMap = overrideMap, baseMap + } + mergeMapsRecursive(baseMap, overrideMap) + mergedJSON, err := json.Marshal(baseMap) + if err != nil { + return fmt.Errorf("failed to marshal merged spec map: %w", err) + } + if err := json.Unmarshal(mergedJSON, base); err != nil { + return fmt.Errorf("failed to unmarshal merged spec JSON: %w", err) + } + return nil +} diff --git a/api/operator/v1beta1/vmextra_types_test.go b/api/operator/v1beta1/vmextra_types_test.go index 8c1bfdb3f..7e3206980 100644 --- a/api/operator/v1beta1/vmextra_types_test.go +++ b/api/operator/v1beta1/vmextra_types_test.go @@ -346,3 +346,143 @@ func TestCommonAppsParamsValidate(t *testing.T) { TerminationGracePeriodSeconds: ptr.To[int64](15), }, true) } + +func TestDeepMerge(t *testing.T) { + type opts struct { + override *VMClusterSpec + validate func(base, merged *VMClusterSpec, err error) + } + f := func(oss ...opts) { + t.Helper() + base := &VMClusterSpec{ + ClusterVersion: "v1.0.0", + ServiceAccountName: "base", + RetentionPeriod: "30d", + VMSelect: &VMSelect{ + CommonAppsParams: CommonAppsParams{ + ReplicaCount: ptr.To(int32(1)), + ExtraArgs: map[string]string{"keep": "x", "override": "old"}, + }, + }, + VMInsert: &VMInsert{ + CommonAppsParams: CommonAppsParams{ + ReplicaCount: ptr.To(int32(1)), + ExtraArgs: map[string]string{"insert-arg": "1"}, + }, + }, + } + merged := base.DeepCopy() + for _, o := range oss { + o.validate(base, merged, MergeDeep(merged, o.override, false)) + } + } + + // with extra args override + f(opts{ + override: &VMClusterSpec{ + ClusterVersion: "v1.2.3", + VMSelect: &VMSelect{ + CommonAppsParams: CommonAppsParams{ + ReplicaCount: ptr.To(int32(3)), + ExtraArgs: map[string]string{"override": "new", "add": "y"}, + }, + }, + ServiceAccountName: "zone-sa", + }, + validate: func(base, merged *VMClusterSpec, err error) { + assert.NoError(t, err) + assert.Equal(t, "v1.2.3", merged.ClusterVersion) + assert.Equal(t, "zone-sa", merged.ServiceAccountName) + if !assert.NotNil(t, merged.VMSelect) || !assert.NotNil(t, merged.VMSelect.ReplicaCount) { + return + } + assert.Equal(t, int32(3), *merged.VMSelect.ReplicaCount) + assert.Equal(t, "x", merged.VMSelect.ExtraArgs["keep"]) + assert.Equal(t, "new", merged.VMSelect.ExtraArgs["override"]) + assert.Equal(t, "y", merged.VMSelect.ExtraArgs["add"]) + if !assert.NotNil(t, merged.VMInsert) || !assert.NotNil(t, merged.VMInsert.ReplicaCount) { + return + } + assert.Equal(t, int32(1), *merged.VMInsert.ReplicaCount) + assert.Equal(t, "1", merged.VMInsert.ExtraArgs["insert-arg"]) + }, + }) + + // with nil override spec + f(opts{ + validate: func(base, merged *VMClusterSpec, err error) { + assert.NoError(t, err) + assert.Equal(t, base, merged) + }, + }) + + // with empty override spec + f(opts{ + override: &VMClusterSpec{}, + validate: func(base, merged *VMClusterSpec, err error) { + assert.NoError(t, err) + assert.Equal(t, base, merged) + }, + }) + + // with override spec that modifies top-level fields + f(opts{ + override: &VMClusterSpec{ + ClusterVersion: "v2.0.0", + ServiceAccountName: "global-sa", + }, + validate: func(_, merged *VMClusterSpec, err error) { + assert.Equal(t, "v2.0.0", merged.ClusterVersion) + assert.Equal(t, "global-sa", merged.ServiceAccountName) + assert.Equal(t, "30d", merged.RetentionPeriod) + }, + }) + + // multiple overrides applied in sequence + f(opts{ + override: &VMClusterSpec{ + ClusterVersion: "v2.0.0", + ServiceAccountName: "global-sa", + }, + validate: func(_, merged *VMClusterSpec, err error) { + assert.NoError(t, err) + assert.Equal(t, "v2.0.0", merged.ClusterVersion) + assert.Equal(t, "global-sa", merged.ServiceAccountName) + assert.Equal(t, "30d", merged.RetentionPeriod) + }, + }, opts{ + override: &VMClusterSpec{ + RetentionPeriod: "10d", + ClusterVersion: "v3.0.0", + }, + validate: func(_, merged *VMClusterSpec, err error) { + assert.NoError(t, err) + assert.Equal(t, "v3.0.0", merged.ClusterVersion) + assert.Equal(t, "global-sa", merged.ServiceAccountName) + assert.Equal(t, "10d", merged.RetentionPeriod) + }, + }) +} + +func TestMergeMapsRecursive(t *testing.T) { + base := map[string]any{ + "a": map[string]any{ + "b": "keep", + "c": "override-me", + }, + "d": "root-keep", + } + override := map[string]any{ + "a": map[string]any{ + "c": "new", + "z": "added", + }, + "e": "root-added", + } + mergeMapsRecursive(base, override) + assert.Equal(t, "keep", base["a"].(map[string]any)["b"]) + assert.Equal(t, "new", base["a"].(map[string]any)["c"]) + assert.Equal(t, "added", base["a"].(map[string]any)["z"]) + assert.Equal(t, "root-keep", base["d"]) + assert.Equal(t, "root-added", base["e"]) +} diff --git a/api/operator/v1beta1/zz_generated.deepcopy.go b/api/operator/v1beta1/zz_generated.deepcopy.go index 90854863a..d0e6c2a65 100644 --- a/api/operator/v1beta1/zz_generated.deepcopy.go +++ b/api/operator/v1beta1/zz_generated.deepcopy.go @@ -6065,6 +6065,31 @@ func (in *VMClusterList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *VMClusterPool) DeepCopyInto(out *VMClusterPool) { + *out = *in + if in.VMStorage != nil { + in, out := &in.VMStorage, &out.VMStorage + *out = new(VMStorage) + (*in).DeepCopyInto(*out) + } + if in.VMInsert != nil { + in, out := &in.VMInsert, &out.VMInsert + *out = new(VMInsert) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VMClusterPool. +func (in *VMClusterPool) DeepCopy() *VMClusterPool { + if in == nil { + return nil + } + out := new(VMClusterPool) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VMClusterSpec) DeepCopyInto(out *VMClusterSpec) { *out = *in @@ -6119,6 +6144,13 @@ func (in *VMClusterSpec) DeepCopyInto(out *VMClusterSpec) { *out = new(VMClusterDiscovery) **out = **in } + if in.Pools != nil { + in, out := &in.Pools, &out.Pools + *out = make([]VMClusterPool, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VMClusterSpec. diff --git a/config/crd/overlay/crd.descriptionless.yaml b/config/crd/overlay/crd.descriptionless.yaml index 7d1ac8f28..95a6b370e 100644 --- a/config/crd/overlay/crd.descriptionless.yaml +++ b/config/crd/overlay/crd.descriptionless.yaml @@ -15726,6 +15726,24 @@ spec: type: object paused: type: boolean + pools: + items: + properties: + name: + maxLength: 32 + pattern: ^[a-z0-9]([a-z0-9-]*[a-z0-9])?$ + type: string + vminsert: + x-kubernetes-preserve-unknown-fields: true + vmstorage: + x-kubernetes-preserve-unknown-fields: true + required: + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map replicationFactor: format: int32 type: integer @@ -17681,6 +17699,9 @@ spec: - retention type: object type: array + retentionPeriod: + pattern: ^[0-9]+(h|d|w|y)?$ + type: string revisionHistoryLimitCount: format: int32 type: integer diff --git a/config/crd/overlay/crd.yaml b/config/crd/overlay/crd.yaml index dfda679e6..eaf883f69 100644 --- a/config/crd/overlay/crd.yaml +++ b/config/crd/overlay/crd.yaml @@ -30640,6 +30640,48 @@ spec: Paused If set to true all actions on the underlying managed objects are not going to be performed, except for delete actions. type: boolean + pools: + description: |- + Pools defines named groups of vmstorage (and optionally vminsert) components. + Each pool gets its own StatefulSet and headless Service named --. + Top-level vmstorage and vminsert specs act as defaults; pool specs override them field-by-field. + vmselect queries all pools using the pool name as a storage group name (-storageNode=/). + When pools are defined the top-level vmstorage is not deployed; pools replace it entirely. + The top-level vminsert is deployed as a shared insert across all pools only when no pool + defines its own vminsert; as soon as any pool has a dedicated vminsert the top-level one is skipped. + items: + description: |- + VMClusterPool defines a named group of vmstorage (and optionally vminsert) components + within a VMCluster. Each pool gets its own StatefulSet and headless Service. + properties: + name: + description: |- + Name is the unique identifier for this pool within the cluster. + Used as a suffix for generated resource names and as a storage group name in vmselect. + Must be a lowercase alphanumeric DNS label; hyphens allowed in the interior. + maxLength: 32 + pattern: ^[a-z0-9]([a-z0-9-]*[a-z0-9])?$ + type: string + vminsert: + description: |- + VMInsert defines a dedicated vminsert for this pool. + Each field overrides the corresponding field in the top-level vminsert spec. + When nil, the top-level shared vminsert writes to this pool's storage nodes as well. + x-kubernetes-preserve-unknown-fields: true + vmstorage: + description: |- + VMStorage defines pool-specific vmstorage configuration. + Each field overrides the corresponding field in the top-level vmstorage spec. + Fields absent here inherit from the top-level vmstorage. + RetentionPeriod on VMStorage overrides the cluster-level retentionPeriod for this pool. + x-kubernetes-preserve-unknown-fields: true + required: + - name + type: object + type: array + x-kubernetes-list-map-keys: + - name + x-kubernetes-list-type: map replicationFactor: description: |- ReplicationFactor defines how many copies of data make among @@ -34520,6 +34562,12 @@ spec: - retention type: object type: array + retentionPeriod: + description: |- + RetentionPeriod overrides the cluster-level retentionPeriod for this storage instance. + Useful when using Pools to implement multi-retention setups. + pattern: ^[0-9]+(h|d|w|y)?$ + type: string revisionHistoryLimitCount: description: |- The number of old ReplicaSets to retain to allow rollback in deployment or diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 924791de2..86327e4de 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -14,12 +14,14 @@ aliases: ## tip * Dependency: [vmoperator](https://docs.victoriametrics.com/operator/): Updated default versions for VM apps to [v1.146.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.146.0) version +* Dependency: [vmoperator](https://docs.victoriametrics.com/operator/): Updated default versions for VL apps to [v1.51.0](https://github.com/VictoriaMetrics/VictoriaLogs/releases/tag/v1.51.0). +* Dependency: [vmoperator](https://docs.victoriametrics.com/operator/): Updated default versions for VT apps to [v0.9.3](https://github.com/VictoriaMetrics/VictoriaTraces/releases/tag/v0.9.3) version. + +* FEATURE: [vmcluster](https://docs.victoriametrics.com/operator/resources/vmcluster/): add `spec.pools` support for multi-retention and tiered-storage setups. Each named pool runs isolated `vmstorage` nodes and optionally a dedicated `vminsert`, all queried transparently by `vmselect` as named storage groups. See [#741](https://github.com/VictoriaMetrics/operator/issues/741). + * BUGFIX: [config-reloader](https://docs.victoriametrics.com/operator/): fix possible panic on Secret watch events when the informer's local cache fell out of sync and Kubernetes delivered a stale tombstone entry instead of the Secret object. The config-reloader now unwraps tombstones correctly and logs an error for any other unexpected types. * BUGFIX: [vmoperator](https://docs.victoriametrics.com/operator/): switch default app probes to `tcpSocket` `startupProbe` when TLS is enabled on the managed HTTP endpoint. This avoids broken kubelet `httpGet` checks against TLS and mTLS-protected workloads. See [#1824](https://github.com/VictoriaMetrics/operator/issues/1824). -* Dependency: [vmoperator](https://docs.victoriametrics.com/operator/): Updated default versions for VL apps to [v1.51.0](https://github.com/VictoriaMetrics/VictoriaLogs/releases/tag/v1.51.0). -* Dependency: [vmoperator](https://docs.victoriametrics.com/operator/): Updated default versions for VT apps to [v0.9.3](https://github.com/VictoriaMetrics/VictoriaTraces/releases/tag/v0.9.3) version. - ## [v0.72.0](https://github.com/VictoriaMetrics/operator/releases/tag/v0.72.0) **Release date:** 15 June 2026 diff --git a/docs/api.md b/docs/api.md index 3a393520a..c02446e80 100644 --- a/docs/api.md +++ b/docs/api.md @@ -4370,6 +4370,19 @@ Appears in: [VMClusterSpec](#vmclusterspec), [VMInsert](#vminsert), [VMSelect](# | filter#
_string_ | _(Optional)_
Filter is an optional regexp filter applied to discovered vmstorage addresses.
Only addresses matching the filter are used; non-matching addresses are ignored. | | interval#
_string_ | _(Optional)_
Interval is the interval for refreshing the vmstorage node list resolved from DNS SRV records.
The minimum supported value is 1s.
Defaults to 2s if not set. | +#### VMClusterPool + +VMClusterPool defines a named group of vmstorage (and optionally vminsert) components +within a VMCluster. Each pool gets its own StatefulSet and headless Service. + +Appears in: [VMClusterSpec](#vmclusterspec) + +| Field | Description | +| --- | --- | +| name#
_string_ | _(Required)_
Name is the unique identifier for this pool within the cluster.
Used as a suffix for generated resource names and as a storage group name in vmselect.
Must be a lowercase alphanumeric DNS label; hyphens allowed in the interior. | +| vminsert#
_[VMInsert](#vminsert)_ | _(Optional)_
VMInsert defines a dedicated vminsert for this pool.
Each field overrides the corresponding field in the top-level vminsert spec.
When nil, the top-level shared vminsert writes to this pool's storage nodes as well. | +| vmstorage#
_[VMStorage](#vmstorage)_ | _(Optional)_
VMStorage defines pool-specific vmstorage configuration.
Each field overrides the corresponding field in the top-level vmstorage spec.
Fields absent here inherit from the top-level vmstorage.
RetentionPeriod on VMStorage overrides the cluster-level retentionPeriod for this pool. | + #### VMClusterSpec VMClusterSpec defines the desired state of VMCluster @@ -4386,6 +4399,7 @@ Appears in: [VMCluster](#vmcluster), [VMDistributedZoneCluster](#vmdistributedzo | license#
_[License](#license)_ | _(Optional)_
License allows to configure license key to be used for enterprise features.
Using license key is supported starting from VictoriaMetrics v1.94.0.
See [here](https://docs.victoriametrics.com/victoriametrics/enterprise/) | | managedMetadata#
_[ManagedObjectsMetadata](#managedobjectsmetadata)_ | _(Required)_
ManagedMetadata defines metadata that will be added to the all objects
created by operator for the given CustomResource | | paused#
_boolean_ | _(Optional)_
Paused If set to true all actions on the underlying managed objects are not
going to be performed, except for delete actions. | +| pools#
_[VMClusterPool](#vmclusterpool) array_ | _(Optional)_
Pools defines named groups of vmstorage (and optionally vminsert) components.
Each pool gets its own StatefulSet and headless Service named --.
Top-level vmstorage and vminsert specs act as defaults; pool specs override them field-by-field.
vmselect queries all pools using the pool name as a storage group name (-storageNode=/).
When pools are defined the top-level vmstorage is not deployed; pools replace it entirely.
The top-level vminsert is deployed as a shared insert across all pools only when no pool
defines its own vminsert; as soon as any pool has a dedicated vminsert the top-level one is skipped. | | replicationFactor#
_integer_ | _(Optional)_
ReplicationFactor defines how many copies of data make among
distinct storage nodes | | requestsLoadBalancer#
_[VMAuthLoadBalancer](#vmauthloadbalancer)_ | _(Required)_
RequestsLoadBalancer configures load-balancing for vminsert and vmselect requests.
It helps to evenly spread load across pods.
Usually it's not possible with Kubernetes TCP-based services.
See more [here](https://docs.victoriametrics.com/operator/resources/vmcluster/#requests-load-balancing) | | retentionPeriod#
_string_ | _(Optional)_
RetentionPeriod defines how long to retain stored metrics, specified as a duration (e.g., "1d", "1w", "1m").
Data with timestamps outside the RetentionPeriod is automatically deleted. The minimum allowed value is 1d, or 24h.
The default value is 1 (one month).
See [retention](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#retention) docs for details. | @@ -4397,7 +4411,7 @@ Appears in: [VMCluster](#vmcluster), [VMDistributedZoneCluster](#vmdistributedzo #### VMInsert -Appears in: [VMClusterSpec](#vmclusterspec) +Appears in: [VMClusterPool](#vmclusterpool), [VMClusterSpec](#vmclusterspec) | Field | Description | | --- | --- | @@ -5016,7 +5030,7 @@ Appears in: [VMStaticScrape](#vmstaticscrape) #### VMStorage -Appears in: [VMClusterSpec](#vmclusterspec) +Appears in: [VMClusterPool](#vmclusterpool), [VMClusterSpec](#vmclusterspec) | Field | Description | | --- | --- | @@ -5059,6 +5073,7 @@ Appears in: [VMClusterSpec](#vmclusterspec) | replicaCount#
_integer_ | _(Optional)_
ReplicaCount is the expected size of the Application. | | resources#
_[ResourceRequirements](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.35/#resourcerequirements-v1-core)_ | _(Optional)_
Resources container resource request and limits, https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
if not defined default resources from operator config will be used | | retentionFilters#
_[RetentionFiltersConfig](#retentionfiltersconfig)_ | _(Optional)_
RetentionFilters defines per-series retention filters for vmstorage.
Requires enterprise license. See https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#retention-filters | +| retentionPeriod#
_string_ | _(Optional)_
RetentionPeriod overrides the cluster-level retentionPeriod for this storage instance.
Useful when using Pools to implement multi-retention setups. | | revisionHistoryLimitCount#
_integer_ | _(Optional)_
The number of old ReplicaSets to retain to allow rollback in deployment or
maximum number of revisions that will be maintained in the Deployment revision history.
Has no effect at StatefulSets
Defaults to 10. | | rollingUpdateStrategy#
_[StatefulSetUpdateStrategyType](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.35/#statefulsetupdatestrategytype-v1-apps)_ | _(Optional)_
RollingUpdateStrategy defines strategy for application updates
Default is OnDelete, in this case operator handles update process
Can be changed for RollingUpdate | | rollingUpdateStrategyBehavior#
_[StatefulSetUpdateStrategyBehavior](#statefulsetupdatestrategybehavior)_ | _(Optional)_
RollingUpdateStrategyBehavior defines customized behavior for rolling updates.
It applies if the RollingUpdateStrategy is set to OnDelete, which is the default. | diff --git a/docs/resources/vmcluster.md b/docs/resources/vmcluster.md index 449490d21..729e83e2b 100644 --- a/docs/resources/vmcluster.md +++ b/docs/resources/vmcluster.md @@ -863,6 +863,122 @@ spec: enabled: false ``` +## Storage pools + +`spec.pools` allows you to run multiple isolated groups of `vmstorage` nodes within a single `VMCluster`. +Each pool has its own name, StatefulSet, headless Service, and optional PodDisruptionBudget / HPA / VPA. +`vmselect` queries all pools simultaneously and merges results transparently, so existing queries require no changes. + +The primary use case is **multi-retention**: assign a different `retentionPeriod` to each pool so that some metrics expire sooner than others, without deploying separate clusters. + +When `spec.pools` is defined the top-level `spec.vmstorage` is **not** deployed; pools replace it entirely. + +### Pool configuration and inheritance + +Each pool entry has a `name` and an optional `vmstorage` override: + +| Field | Description | +|-------|-------------| +| `name` | Unique identifier within the cluster. Used as a resource name suffix and as the storage group name in `vmselect`. Must match `^[a-z0-9]([a-z0-9-]*[a-z0-9])?$`, max 32 chars. | +| `vmstorage` | Pool-specific `vmstorage` configuration. Each field overrides the corresponding field in the top-level `spec.vmstorage`; absent fields inherit from the top-level. | +| `vminsert` | Optional dedicated `vminsert` for this pool. When nil, the shared top-level `vminsert` writes to this pool's storage nodes as well. | + +`vmstorage.retentionPeriod` inside a pool overrides `spec.retentionPeriod` for that pool, enabling per-pool retention without enterprise retention filters. + +### Resource naming + +Pool resources follow the pattern `vm--`: + +| Resource | Example | +|----------|---------| +| `vmstorage` StatefulSet | `vmstorage-mycluster-hot` | +| `vmstorage` headless Service | `vmstorage-mycluster-hot` | +| `vminsert` Deployment (dedicated) | `vminsert-mycluster-hot` | + +All pool pods carry an extra label `app.kubernetes.io/pool=` to keep per-pool selectors disjoint. + +### Multi-retention example + +Two pools with different retention periods sharing a single `vminsert` and `vmselect`: + +```yaml +apiVersion: operator.victoriametrics.com/v1beta1 +kind: VMCluster +metadata: + name: example-pools +spec: + retentionPeriod: "30d" # default, used as fallback + vmselect: + replicaCount: 2 + vminsert: + replicaCount: 2 # shared across both pools + # top-level vmstorage sets shared defaults inherited by all pools + vmstorage: + replicaCount: 3 + storage: + volumeClaimTemplate: + spec: + resources: + requests: + storage: 50Gi + pools: + - name: hot + vmstorage: + retentionPeriod: "7d" + - name: cold + vmstorage: + replicaCount: 5 + retentionPeriod: "365d" + storage: + volumeClaimTemplate: + spec: + resources: + requests: + storage: 2Ti +``` + +This creates: +- `vmstorage-example-pools-hot` StatefulSet with 3 replicas (inherited) and 7-day retention. +- `vmstorage-example-pools-cold` StatefulSet with 5 replicas (overridden) and 365-day retention. +- One shared `vminsert-example-pools` Deployment writing to both pools. +- `vmselect` queries both pools and returns merged results. + +### Pool with a dedicated vminsert + +When a pool defines `vminsert`, it gets its own isolated insert path. +The shared top-level `vminsert` is **not** deployed when every pool has a dedicated one. +Use this when pools need different write-path configurations (e.g. different stream aggregation rules or resource limits): + +```yaml +apiVersion: operator.victoriametrics.com/v1beta1 +kind: VMCluster +metadata: + name: example-dedicated-insert +spec: + retentionPeriod: "30d" + vmselect: + replicaCount: 2 + # top-level vminsert sets shared defaults inherited by pool-level inserts + vminsert: + replicaCount: 2 + resources: + limits: + cpu: "1" + memory: 512Mi + vmstorage: + replicaCount: 3 + pools: + - name: short + vmstorage: + retentionPeriod: "7d" + vminsert: + replicaCount: 4 # override: more replicas for high-throughput writes + - name: long + vmstorage: + retentionPeriod: "2y" + vminsert: {} # dedicated insert inheriting all top-level vminsert defaults +``` + ## Examples ### Minimal example without persistence diff --git a/internal/controller/operator/factory/build/build.go b/internal/controller/operator/factory/build/build.go index 0ce42e7e4..3b7cad238 100644 --- a/internal/controller/operator/factory/build/build.go +++ b/internal/controller/operator/factory/build/build.go @@ -3,7 +3,6 @@ package build import ( "bytes" "compress/gzip" - "encoding/json" "fmt" "io" "path" @@ -298,68 +297,3 @@ func GunzipConfig(data []byte) ([]byte, error) { defer gr.Close() return io.ReadAll(gr) } - -// mergeMapsRecursive deeply merges overrideMap into baseMap. -// It handles nested maps (which correspond to nested structs after JSON unmarshal). -// Values from overrideMap overwrite values in baseMap. -func mergeMapsRecursive(baseMap, overrideMap map[string]any) { - if len(overrideMap) == 0 { - return - } - for key, overrideValue := range overrideMap { - if baseVal, ok := baseMap[key]; ok { - if baseMapNested, isBaseMap := baseVal.(map[string]any); isBaseMap { - if overrideMapNested, isOverrideMap := overrideValue.(map[string]any); isOverrideMap { - // Both are nested maps, recurse - mergeMapsRecursive(baseMapNested, overrideMapNested) - continue - } - } - } - baseMap[key] = overrideValue - } -} - -// MergeDeep merges an override object into a base one. -// Fields present in the override will overwrite corresponding fields in the base. -func MergeDeep[T comparable](base, override T, reverse bool) error { - var zero T - if override == zero { - return nil - } - - baseJSON, err := json.Marshal(base) - if err != nil { - return fmt.Errorf("failed to marshal base spec: %w", err) - } - overrideJSON, err := json.Marshal(override) - if err != nil { - return fmt.Errorf("failed to marshal override spec: %w", err) - } - - var baseMap map[string]any - if err := json.Unmarshal(baseJSON, &baseMap); err != nil { - return fmt.Errorf("failed to unmarshal base spec to map: %w", err) - } - var overrideMap map[string]any - if err := json.Unmarshal(overrideJSON, &overrideMap); err != nil { - return fmt.Errorf("failed to unmarshal override spec to map: %w", err) - } - - // Perform a deep merge: fields from overrideMap recursively overwrite corresponding fields in baseMap. - // If an override value is explicitly nil, it signifies the removal or nullification of that field. - if reverse { - baseMap, overrideMap = overrideMap, baseMap - } - mergeMapsRecursive(baseMap, overrideMap) - mergedSpecJSON, err := json.Marshal(baseMap) - if err != nil { - return fmt.Errorf("failed to marshal merged spec map: %w", err) - } - - if err := json.Unmarshal(mergedSpecJSON, base); err != nil { - return fmt.Errorf("failed to unmarshal merged spec JSON: %w", err) - } - - return nil -} diff --git a/internal/controller/operator/factory/build/build_test.go b/internal/controller/operator/factory/build/build_test.go index 9855a1b61..244e2b235 100644 --- a/internal/controller/operator/factory/build/build_test.go +++ b/internal/controller/operator/factory/build/build_test.go @@ -107,150 +107,3 @@ func TestGzipGunzipConfig(t *testing.T) { // binary data f([]int32{1, 2, 3, 4}) } - -func TestMergeMapsRecursive(t *testing.T) { - base := map[string]any{ - "a": map[string]any{ - "b": "keep", - "c": "override-me", - }, - "d": "root-keep", - } - override := map[string]any{ - "a": map[string]any{ - "c": "new", - "z": "added", - }, - "e": "root-added", - } - // initial merge - mergeMapsRecursive(base, override) - assert.Equal(t, "keep", base["a"].(map[string]any)["b"]) - assert.Equal(t, "new", base["a"].(map[string]any)["c"]) - assert.Equal(t, "added", base["a"].(map[string]any)["z"]) - assert.Equal(t, "root-keep", base["d"]) - assert.Equal(t, "root-added", base["e"]) -} - -func TestDeepMerge(t *testing.T) { - type opts struct { - override *vmv1beta1.VMClusterSpec - validate func(base, merged *vmv1beta1.VMClusterSpec, err error) - } - f := func(oss ...opts) { - t.Helper() - base := &vmv1beta1.VMClusterSpec{ - ClusterVersion: "v1.0.0", - ServiceAccountName: "base", - RetentionPeriod: "30d", - VMSelect: &vmv1beta1.VMSelect{ - CommonAppsParams: vmv1beta1.CommonAppsParams{ - ReplicaCount: ptr.To(int32(1)), - ExtraArgs: map[string]string{"keep": "x", "override": "old"}, - }, - }, - VMInsert: &vmv1beta1.VMInsert{ - CommonAppsParams: vmv1beta1.CommonAppsParams{ - ReplicaCount: ptr.To(int32(1)), - ExtraArgs: map[string]string{"insert-arg": "1"}, - }, - }, - } - merged := base.DeepCopy() - for _, o := range oss { - o.validate(base, merged, MergeDeep(merged, o.override, false)) - } - } - - // with extra args override - f(opts{ - override: &vmv1beta1.VMClusterSpec{ - ClusterVersion: "v1.2.3", - VMSelect: &vmv1beta1.VMSelect{ - CommonAppsParams: vmv1beta1.CommonAppsParams{ - ReplicaCount: ptr.To(int32(3)), - ExtraArgs: map[string]string{"override": "new", "add": "y"}, - }, - }, - ServiceAccountName: "zone-sa", - }, - validate: func(base, merged *vmv1beta1.VMClusterSpec, err error) { - assert.NoError(t, err) - - // top-level - assert.Equal(t, "v1.2.3", merged.ClusterVersion) - assert.Equal(t, "zone-sa", merged.ServiceAccountName) - - // nested merge - if !assert.NotNil(t, merged.VMSelect) || !assert.NotNil(t, merged.VMSelect.ReplicaCount) { - return - } - assert.Equal(t, int32(3), *merged.VMSelect.ReplicaCount) - assert.Equal(t, "x", merged.VMSelect.ExtraArgs["keep"]) - assert.Equal(t, "new", merged.VMSelect.ExtraArgs["override"]) - assert.Equal(t, "y", merged.VMSelect.ExtraArgs["add"]) - - // untouched subtree - if !assert.NotNil(t, merged.VMInsert) || !assert.NotNil(t, merged.VMInsert.ReplicaCount) { - return - } - assert.Equal(t, int32(1), *merged.VMInsert.ReplicaCount) - assert.Equal(t, "1", merged.VMInsert.ExtraArgs["insert-arg"]) - }, - }) - - // with nil override spec - f(opts{ - validate: func(base, merged *vmv1beta1.VMClusterSpec, err error) { - assert.NoError(t, err) - assert.Equal(t, base, merged) - }, - }) - - // with empty override spec - f(opts{ - override: &vmv1beta1.VMClusterSpec{}, - validate: func(base, merged *vmv1beta1.VMClusterSpec, err error) { - assert.NoError(t, err) - assert.Equal(t, base, merged) - }, - }) - - // with override spec that modifies top-level fields - f(opts{ - override: &vmv1beta1.VMClusterSpec{ - ClusterVersion: "v2.0.0", - ServiceAccountName: "global-sa", - }, - validate: func(_, merged *vmv1beta1.VMClusterSpec, err error) { - assert.Equal(t, "v2.0.0", merged.ClusterVersion) - assert.Equal(t, "global-sa", merged.ServiceAccountName) - assert.Equal(t, "30d", merged.RetentionPeriod) - }, - }) - - // multiple overrides - f(opts{ - override: &vmv1beta1.VMClusterSpec{ - ClusterVersion: "v2.0.0", - ServiceAccountName: "global-sa", - }, - validate: func(_, merged *vmv1beta1.VMClusterSpec, err error) { - assert.NoError(t, err) - assert.Equal(t, "v2.0.0", merged.ClusterVersion) - assert.Equal(t, "global-sa", merged.ServiceAccountName) - assert.Equal(t, "30d", merged.RetentionPeriod) - }, - }, opts{ - override: &vmv1beta1.VMClusterSpec{ - RetentionPeriod: "10d", - ClusterVersion: "v3.0.0", - }, - validate: func(_, merged *vmv1beta1.VMClusterSpec, err error) { - assert.NoError(t, err) - assert.Equal(t, "v3.0.0", merged.ClusterVersion) // Cluster-specific override should take precedence - assert.Equal(t, "global-sa", merged.ServiceAccountName) // From global override, unchanged by cluster override - assert.Equal(t, "10d", merged.RetentionPeriod) // From cluster override - }, - }) -} diff --git a/internal/controller/operator/factory/build/cluster.go b/internal/controller/operator/factory/build/cluster.go index 76e917979..bd6c8a063 100644 --- a/internal/controller/operator/factory/build/cluster.go +++ b/internal/controller/operator/factory/build/cluster.go @@ -25,11 +25,12 @@ type ChildBuilder struct { kind vmv1beta1.ClusterComponent finalLabels map[string]string selectorLabels map[string]string + prefixedName string } // PrefixedName implements build.svcBuilderArgs interface func (b *ChildBuilder) PrefixedName() string { - return b.ParentOpts.PrefixedName(b.kind) + return b.prefixedName } // FinalLabels implements build.svcBuilderArgs interface @@ -55,11 +56,30 @@ func (b *ChildBuilder) SetSelectorLabels(ls map[string]string) { b.selectorLabels = ls } +// poolLabelKey is the label added to all pool-scoped resources. +const poolLabelKey = "app.kubernetes.io/pool" + func NewChildBuilder(cr ParentOpts, kind vmv1beta1.ClusterComponent) *ChildBuilder { + return NewPoolBuilder(cr, kind, "") +} + +// NewPoolBuilder is like NewChildBuilder but scopes the builder to a named pool. +// PrefixedName appends "-", and both SelectorLabels and FinalLabels include +// the pool label so that multiple pools in the same namespace have non-overlapping selectors. +func NewPoolBuilder(cr ParentOpts, kind vmv1beta1.ClusterComponent, poolName string) *ChildBuilder { + selectorLabels := cr.SelectorLabels(kind) + finalLabels := cr.FinalLabels(kind) + name := cr.PrefixedName(kind) + if poolName != "" { + name += "-" + poolName + selectorLabels[poolLabelKey] = poolName + finalLabels[poolLabelKey] = poolName + } return &ChildBuilder{ ParentOpts: cr, kind: kind, - finalLabels: cr.FinalLabels(kind), - selectorLabels: cr.SelectorLabels(kind), + finalLabels: finalLabels, + selectorLabels: selectorLabels, + prefixedName: name, } } diff --git a/internal/controller/operator/factory/vmauth/vmusers_config.go b/internal/controller/operator/factory/vmauth/vmusers_config.go index 50767658b..385532f87 100644 --- a/internal/controller/operator/factory/vmauth/vmusers_config.go +++ b/internal/controller/operator/factory/vmauth/vmusers_config.go @@ -79,7 +79,7 @@ func (pos *parsedObjects) buildConfig(ctx context.Context, rclient client.Client backends[ref.Name] = ref } else { if r, ok := backends[ref.Name]; ok { - if err := build.MergeDeep(ref, r, true); err != nil { + if err := vmv1beta1.MergeDeep(ref, r, true); err != nil { return fmt.Errorf("failed to merge target refs: %w", err) } } else { diff --git a/internal/controller/operator/factory/vmcluster/vmcluster.go b/internal/controller/operator/factory/vmcluster/vmcluster.go index 4712927dd..7a09652c0 100644 --- a/internal/controller/operator/factory/vmcluster/vmcluster.go +++ b/internal/controller/operator/factory/vmcluster/vmcluster.go @@ -15,6 +15,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" vpav1 "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -81,28 +82,33 @@ func CreateOrUpdate(ctx context.Context, cr *vmv1beta1.VMCluster, rclient client } } - if cr.Spec.VMStorage != nil { + if cr.Spec.VMStorage != nil && len(cr.Spec.Pools) == 0 { if cr.Spec.VMStorage.PodDisruptionBudget != nil { - err := createOrUpdatePodDisruptionBudgetForVMStorage(ctx, rclient, cr, prevCR) + err := createOrUpdatePodDisruptionBudgetForVMStorage(ctx, rclient, cr, prevCR, "") if err != nil { return err } } - if err := createOrUpdateVMStorage(ctx, rclient, cr, prevCR); err != nil { + if err := createOrUpdateVMStorage(ctx, rclient, cr, prevCR, "", owner); err != nil { return err } - - if err := createOrUpdateVMStorageService(ctx, rclient, cr, prevCR); err != nil { + if err := createOrUpdateVMStorageService(ctx, rclient, cr, prevCR, owner, ""); err != nil { return err } - if err := createOrUpdateVMStorageHPA(ctx, rclient, cr, prevCR); err != nil { + if err := createOrUpdateVMStorageHPA(ctx, rclient, cr, prevCR, ""); err != nil { return err } - if err := createOrUpdateVMStorageVPA(ctx, rclient, cr, prevCR); err != nil { + if err := createOrUpdateVMStorageVPA(ctx, rclient, cr, prevCR, ""); err != nil { return err } } + for _, pool := range cr.Spec.Pools { + if err := createOrUpdatePool(ctx, rclient, cr, prevCR, pool, owner); err != nil { + return fmt.Errorf("pool %q: %w", pool.Name, err) + } + } + if cr.Spec.VMSelect != nil { if cr.Spec.VMSelect.PodDisruptionBudget != nil { if err := createOrUpdatePodDisruptionBudgetForVMSelect(ctx, rclient, cr, prevCR); err != nil { @@ -125,22 +131,29 @@ func CreateOrUpdate(ctx context.Context, cr *vmv1beta1.VMCluster, rclient client } } - if cr.Spec.VMInsert != nil { + hasPoolInsert := false + for _, p := range cr.Spec.Pools { + if p.VMInsert != nil { + hasPoolInsert = true + break + } + } + if cr.Spec.VMInsert != nil && !hasPoolInsert { if cr.Spec.VMInsert.PodDisruptionBudget != nil { - if err := createOrUpdatePodDisruptionBudgetForVMInsert(ctx, rclient, cr, prevCR); err != nil { + if err := createOrUpdatePodDisruptionBudgetForVMInsert(ctx, rclient, cr, prevCR, ""); err != nil { return err } } - if err := createOrUpdateVMInsert(ctx, rclient, cr, prevCR); err != nil { + if err := createOrUpdateVMInsert(ctx, rclient, cr, prevCR, "", owner); err != nil { return err } - if err := createOrUpdateVMInsertService(ctx, rclient, cr, prevCR); err != nil { + if err := createOrUpdateVMInsertService(ctx, rclient, cr, prevCR, owner, ""); err != nil { return err } - if err := createOrUpdateVMInsertHPA(ctx, rclient, cr, prevCR); err != nil { + if err := createOrUpdateVMInsertHPA(ctx, rclient, cr, prevCR, ""); err != nil { return err } - if err := createOrUpdateVMInsertVPA(ctx, rclient, cr, prevCR); err != nil { + if err := createOrUpdateVMInsertVPA(ctx, rclient, cr, prevCR, ""); err != nil { return err } } @@ -305,21 +318,22 @@ func createOrUpdateLBProxyService(ctx context.Context, rclient client.Client, cr return nil } -func createOrUpdateVMInsert(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster) error { +func createOrUpdateVMInsert(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster, poolName string, owner metav1.OwnerReference) error { var prevDeploy *appsv1.Deployment if prevCR != nil && prevCR.Spec.VMInsert != nil { var err error - prevDeploy, err = genVMInsertSpec(prevCR) + prevDeploy, err = genVMInsertSpec(prevCR, poolName) if err != nil { return fmt.Errorf("cannot generate prev deploy spec: %w", err) } + prevDeploy.OwnerReferences = []metav1.OwnerReference{owner} } - newDeployment, err := genVMInsertSpec(cr) + newDeployment, err := genVMInsertSpec(cr, poolName) if err != nil { return err } - owner := cr.AsOwner() + newDeployment.OwnerReferences = []metav1.OwnerReference{owner} o := reconcile.DeploymentOpts{ PatchSpec: func(existingSpec, newSpec *appsv1.DeploymentSpec) { if cr.Spec.VMInsert.HPA != nil { @@ -331,8 +345,8 @@ func createOrUpdateVMInsert(ctx context.Context, rclient client.Client, cr, prev return reconcile.Deployment(ctx, rclient, newDeployment, prevDeploy, &owner, &o) } -func buildVMInsertService(cr *vmv1beta1.VMCluster) *corev1.Service { - b := build.NewChildBuilder(cr, vmv1beta1.ClusterComponentInsert) +func buildVMInsertService(cr *vmv1beta1.VMCluster, poolName string) *corev1.Service { + b := build.NewPoolBuilder(cr, vmv1beta1.ClusterComponentInsert, poolName) svc := build.Service(b, cr.Spec.VMInsert.Port, func(svc *corev1.Service) { build.AppendInsertPortsToService(cr.Spec.VMInsert.InsertPorts, svc) if cr.Spec.VMInsert.ClusterNativePort != "" { @@ -365,16 +379,17 @@ func buildVMInsertScrape(cr *vmv1beta1.VMCluster, svc *corev1.Service) *vmv1beta return svs } -func createOrUpdateVMInsertService(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster) error { - svc := buildVMInsertService(cr) +func createOrUpdateVMInsertService(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster, owner metav1.OwnerReference, poolName string) error { + svc := buildVMInsertService(cr, poolName) + svc.OwnerReferences = []metav1.OwnerReference{owner} var prevSvc, prevAdditionalSvc *corev1.Service if prevCR != nil && prevCR.Spec.VMInsert != nil { - prevSvc = buildVMInsertService(prevCR) + prevSvc = buildVMInsertService(prevCR, poolName) + prevSvc.OwnerReferences = []metav1.OwnerReference{owner} prevAdditionalSvcBase := *prevSvc prevAdditionalSvcBase.Name = prevCR.PrefixedName(vmv1beta1.ClusterComponentInsert) prevAdditionalSvc = build.AdditionalServiceFromDefault(&prevAdditionalSvcBase, prevCR.Spec.VMInsert.ServiceSpec) } - owner := cr.AsOwner() if err := cr.Spec.VMInsert.ServiceSpec.IsSomeAndThen(func(s *vmv1beta1.AdditionalServiceSpec) error { additionalSvcBase := *svc additionalSvcBase.Name = cr.PrefixedName(vmv1beta1.ClusterComponentInsert) @@ -415,23 +430,25 @@ func createOrUpdateVMInsertService(ctx context.Context, rclient client.Client, c return nil } -func createOrUpdateVMStorage(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster) error { +func createOrUpdateVMStorage(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster, poolName string, owner metav1.OwnerReference) error { var prevSts *appsv1.StatefulSet if prevCR != nil && prevCR.Spec.VMStorage != nil { var err error - prevSts, err = buildVMStorageSpec(ctx, prevCR) + prevSts, err = buildVMStorageSpec(ctx, prevCR, poolName) if err != nil { return fmt.Errorf("cannot build prev storage spec: %w", err) } + prevSts.OwnerReferences = []metav1.OwnerReference{owner} } - newSts, err := buildVMStorageSpec(ctx, cr) + newSts, err := buildVMStorageSpec(ctx, cr, poolName) if err != nil { return err } + newSts.OwnerReferences = []metav1.OwnerReference{owner} o := reconcile.StatefulSetOpts{ - SelectorLabels: cr.SelectorLabels(vmv1beta1.ClusterComponentStorage), + SelectorLabels: newSts.Spec.Selector.MatchLabels, UpdateBehavior: cr.Spec.VMStorage.RollingUpdateStrategyBehavior, PatchSpec: func(existingSpec, newSpec *appsv1.StatefulSetSpec) { if cr.Spec.VMStorage.HPA != nil { @@ -440,12 +457,11 @@ func createOrUpdateVMStorage(ctx context.Context, rclient client.Client, cr, pre } }, } - owner := cr.AsOwner() return reconcile.StatefulSet(ctx, rclient, newSts, prevSts, &owner, &o) } -func buildVMStorageService(cr *vmv1beta1.VMCluster) *corev1.Service { - b := build.NewChildBuilder(cr, vmv1beta1.ClusterComponentStorage) +func buildVMStorageService(cr *vmv1beta1.VMCluster, poolName string) *corev1.Service { + b := build.NewPoolBuilder(cr, vmv1beta1.ClusterComponentStorage, poolName) return build.Service(b, cr.Spec.VMStorage.Port, func(svc *corev1.Service) { svc.Spec.ClusterIP = "None" svc.Spec.PublishNotReadyAddresses = true @@ -482,14 +498,15 @@ func buildVMStorageScrape(cr *vmv1beta1.VMCluster, svc *corev1.Service) *vmv1bet return build.VMServiceScrape(svc, cr.Spec.VMStorage, "vmbackupmanager") } -func createOrUpdateVMStorageService(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster) error { - svc := buildVMStorageService(cr) +func createOrUpdateVMStorageService(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster, owner metav1.OwnerReference, poolName string) error { + svc := buildVMStorageService(cr, poolName) + svc.OwnerReferences = []metav1.OwnerReference{owner} var prevSvc, prevAdditionalSvc *corev1.Service if prevCR != nil && prevCR.Spec.VMStorage != nil { - prevSvc = buildVMStorageService(prevCR) + prevSvc = buildVMStorageService(prevCR, poolName) + prevSvc.OwnerReferences = []metav1.OwnerReference{owner} prevAdditionalSvc = build.AdditionalServiceFromDefault(prevSvc, prevCR.Spec.VMStorage.ServiceSpec) } - owner := cr.AsOwner() if err := cr.Spec.VMStorage.ServiceSpec.IsSomeAndThen(func(s *vmv1beta1.AdditionalServiceSpec) error { additionalSvc := build.AdditionalServiceFromDefault(svc, s) if additionalSvc.Name == svc.Name { @@ -607,21 +624,43 @@ func makePodSpecForVMSelect(cr *vmv1beta1.VMCluster) (*corev1.PodTemplateSpec, e } storageName := cr.PrefixedName(vmv1beta1.ClusterComponentStorage) - if d := cr.Spec.VMSelect.Discovery.OrDefault(cr.Spec.Discovery); d != nil && d.Enabled { - args = append(args, fmt.Sprintf("-storageNode=srv+%s", storageNodeSRVAddr(storageName, cr.Namespace, cr.Spec.VMStorage.VMSelectPort, cr.Spec.ClusterDomainName))) - if d.Interval != "" { - args = append(args, fmt.Sprintf("-storageNode.discoveryInterval=%s", d.Interval)) - } - if d.Filter != "" { - args = append(args, fmt.Sprintf("-storageNode.filter=%s", d.Filter)) + if len(cr.Spec.Pools) == 0 { + if d := cr.Spec.VMSelect.Discovery.OrDefault(cr.Spec.Discovery); d != nil && d.Enabled { + args = append(args, fmt.Sprintf("-storageNode=srv+%s", storageNodeSRVAddr(storageName, cr.Namespace, cr.Spec.VMStorage.VMSelectPort, cr.Spec.ClusterDomainName))) + if d.Interval != "" { + args = append(args, fmt.Sprintf("-storageNode.discoveryInterval=%s", d.Interval)) + } + if d.Filter != "" { + args = append(args, fmt.Sprintf("-storageNode.filter=%s", d.Filter)) + } + } else { + storageNodeFlag := build.NewFlag("-storageNode", "") + storageNodeIds := cr.AvailableStorageNodeIDs(vmv1beta1.ClusterComponentSelect) + for idx, i := range storageNodeIds { + storageNodeFlag.Add(build.PodDNSAddress(storageName, i, cr.Namespace, cr.Spec.VMStorage.VMSelectPort, cr.Spec.ClusterDomainName), idx) + } + args = build.AppendFlagsToArgs(args, len(storageNodeIds), storageNodeFlag) } - } else { - storageNodeFlag := build.NewFlag("-storageNode", "") - storageNodeIds := cr.AvailableStorageNodeIDs(vmv1beta1.ClusterComponentSelect) - for idx, i := range storageNodeIds { - storageNodeFlag.Add(build.PodDNSAddress(storageName, i, cr.Namespace, cr.Spec.VMStorage.VMSelectPort, cr.Spec.ClusterDomainName), idx) + } + + // Pool storage nodes — each pool is exposed as a named storage group. + for _, pool := range cr.Spec.Pools { + poolStr, err := poolStorage(cr, pool) + if err != nil { + return nil, fmt.Errorf("pool %q: cannot build storage spec: %w", pool.Name, err) + } + if poolStr == nil { + continue + } + poolStorageName := cr.PoolPrefixedName(vmv1beta1.ClusterComponentStorage, pool.Name) + if d := cr.Spec.VMSelect.Discovery.OrDefault(cr.Spec.Discovery); d != nil && d.Enabled { + args = append(args, fmt.Sprintf("-storageNode=%s/srv+%s", pool.Name, storageNodeSRVAddr(poolStorageName, cr.Namespace, poolStr.VMSelectPort, cr.Spec.ClusterDomainName))) + } else { + for _, i := range storageNodeIDs(poolStr, vmv1beta1.ClusterComponentSelect) { + addr := build.PodDNSAddress(poolStorageName, i, cr.Namespace, poolStr.VMSelectPort, cr.Spec.ClusterDomainName) + args = append(args, fmt.Sprintf("-storageNode=%s/%s", pool.Name, addr)) + } } - args = build.AppendFlagsToArgs(args, len(storageNodeIds), storageNodeFlag) } // selectNode arg add for deployments without HPA @@ -763,23 +802,26 @@ func createOrUpdatePodDisruptionBudgetForVMSelect(ctx context.Context, rclient c return reconcile.PDB(ctx, rclient, pdb, prevPDB, &owner) } -func genVMInsertSpec(cr *vmv1beta1.VMCluster) (*appsv1.Deployment, error) { - +func genVMInsertSpec(cr *vmv1beta1.VMCluster, poolName string) (*appsv1.Deployment, error) { + b := build.NewPoolBuilder(cr, vmv1beta1.ClusterComponentInsert, poolName) podSpec, err := makePodSpecForVMInsert(cr) if err != nil { return nil, err } + for k, v := range b.SelectorLabels() { + podSpec.Labels[k] = v + } strategyType := appsv1.RollingUpdateDeploymentStrategyType if cr.Spec.VMInsert.UpdateStrategy != nil { strategyType = *cr.Spec.VMInsert.UpdateStrategy } - commonName := cr.PrefixedName(vmv1beta1.ClusterComponentInsert) + commonName := b.PrefixedName() stsSpec := &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: commonName, Namespace: cr.Namespace, - Labels: cr.FinalLabels(vmv1beta1.ClusterComponentInsert), + Labels: b.FinalLabels(), Annotations: cr.FinalAnnotations(), OwnerReferences: []metav1.OwnerReference{cr.AsOwner()}, }, @@ -792,7 +834,7 @@ func genVMInsertSpec(cr *vmv1beta1.VMCluster) (*appsv1.Deployment, error) { RollingUpdate: cr.Spec.VMInsert.RollingUpdate, }, Selector: &metav1.LabelSelector{ - MatchLabels: cr.SelectorLabels(vmv1beta1.ClusterComponentInsert), + MatchLabels: b.SelectorLabels(), }, Template: *podSpec, }, @@ -822,21 +864,46 @@ func makePodSpecForVMInsert(cr *vmv1beta1.VMCluster) (*corev1.PodTemplateSpec, e } storageName := cr.PrefixedName(vmv1beta1.ClusterComponentStorage) - if d := cr.Spec.VMInsert.Discovery.OrDefault(cr.Spec.Discovery); d != nil && d.Enabled { - args = append(args, fmt.Sprintf("-storageNode=srv+%s", storageNodeSRVAddr(storageName, cr.Namespace, cr.Spec.VMStorage.VMInsertPort, cr.Spec.ClusterDomainName))) - if d.Interval != "" { - args = append(args, fmt.Sprintf("-storageNode.discoveryInterval=%s", d.Interval)) + if len(cr.Spec.Pools) == 0 { + if d := cr.Spec.VMInsert.Discovery.OrDefault(cr.Spec.Discovery); d != nil && d.Enabled { + args = append(args, fmt.Sprintf("-storageNode=srv+%s", storageNodeSRVAddr(storageName, cr.Namespace, cr.Spec.VMStorage.VMInsertPort, cr.Spec.ClusterDomainName))) + if d.Interval != "" { + args = append(args, fmt.Sprintf("-storageNode.discoveryInterval=%s", d.Interval)) + } + if d.Filter != "" { + args = append(args, fmt.Sprintf("-storageNode.filter=%s", d.Filter)) + } + } else { + storageNodeFlag := build.NewFlag("-storageNode", "") + storageNodeIds := cr.AvailableStorageNodeIDs(vmv1beta1.ClusterComponentInsert) + for idx, i := range storageNodeIds { + storageNodeFlag.Add(build.PodDNSAddress(storageName, i, cr.Namespace, cr.Spec.VMStorage.VMInsertPort, cr.Spec.ClusterDomainName), idx) + } + args = build.AppendFlagsToArgs(args, len(storageNodeIds), storageNodeFlag) } - if d.Filter != "" { - args = append(args, fmt.Sprintf("-storageNode.filter=%s", d.Filter)) + } + + // Pool storage nodes for pools using the shared vminsert (no dedicated insert). + for _, pool := range cr.Spec.Pools { + if pool.VMInsert != nil { + continue } - } else { - storageNodeFlag := build.NewFlag("-storageNode", "") - storageNodeIds := cr.AvailableStorageNodeIDs(vmv1beta1.ClusterComponentInsert) - for idx, i := range storageNodeIds { - storageNodeFlag.Add(build.PodDNSAddress(storageName, i, cr.Namespace, cr.Spec.VMStorage.VMInsertPort, cr.Spec.ClusterDomainName), idx) + poolStr, err := poolStorage(cr, pool) + if err != nil { + return nil, fmt.Errorf("pool %q: cannot build storage spec: %w", pool.Name, err) + } + if poolStr == nil { + continue + } + poolStorageName := cr.PoolPrefixedName(vmv1beta1.ClusterComponentStorage, pool.Name) + if d := cr.Spec.VMInsert.Discovery.OrDefault(cr.Spec.Discovery); d != nil && d.Enabled { + args = append(args, fmt.Sprintf("-storageNode=srv+%s", storageNodeSRVAddr(poolStorageName, cr.Namespace, poolStr.VMInsertPort, cr.Spec.ClusterDomainName))) + } else { + for _, i := range storageNodeIDs(poolStr, vmv1beta1.ClusterComponentInsert) { + addr := build.PodDNSAddress(poolStorageName, i, cr.Namespace, poolStr.VMInsertPort, cr.Spec.ClusterDomainName) + args = append(args, fmt.Sprintf("-storageNode=%s", addr)) + } } - args = build.AppendFlagsToArgs(args, len(storageNodeIds), storageNodeFlag) } if cr.Spec.ReplicationFactor != nil { @@ -962,37 +1029,42 @@ func makePodSpecForVMInsert(cr *vmv1beta1.VMCluster) (*corev1.PodTemplateSpec, e return vmInsertPodSpec, nil } -func createOrUpdatePodDisruptionBudgetForVMInsert(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster) error { - b := build.NewChildBuilder(cr, vmv1beta1.ClusterComponentInsert) +func createOrUpdatePodDisruptionBudgetForVMInsert(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster, poolName string) error { + b := build.NewPoolBuilder(cr, vmv1beta1.ClusterComponentInsert, poolName) pdb := build.PodDisruptionBudget(b, cr.Spec.VMInsert.PodDisruptionBudget) var prevPDB *policyv1.PodDisruptionBudget if prevCR != nil && prevCR.Spec.VMInsert.PodDisruptionBudget != nil { - b = build.NewChildBuilder(prevCR, vmv1beta1.ClusterComponentInsert) + b = build.NewPoolBuilder(prevCR, vmv1beta1.ClusterComponentInsert, poolName) prevPDB = build.PodDisruptionBudget(b, prevCR.Spec.VMInsert.PodDisruptionBudget) } owner := cr.AsOwner() return reconcile.PDB(ctx, rclient, pdb, prevPDB, &owner) } -func buildVMStorageSpec(ctx context.Context, cr *vmv1beta1.VMCluster) (*appsv1.StatefulSet, error) { - - commonName := cr.PrefixedName(vmv1beta1.ClusterComponentStorage) +func buildVMStorageSpec(ctx context.Context, cr *vmv1beta1.VMCluster, poolName string) (*appsv1.StatefulSet, error) { + b := build.NewPoolBuilder(cr, vmv1beta1.ClusterComponentStorage, poolName) + commonName := b.PrefixedName() podSpec, err := makePodSpecForVMStorage(ctx, cr) if err != nil { return nil, err } + // Merge selector labels into pod template so the STS selector matches its pods. + // For pools this adds the pool label required to keep per-pool selectors disjoint. + for k, v := range b.SelectorLabels() { + podSpec.Labels[k] = v + } stsSpec := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: commonName, Namespace: cr.Namespace, - Labels: cr.FinalLabels(vmv1beta1.ClusterComponentStorage), + Labels: b.FinalLabels(), Annotations: cr.FinalAnnotations(), OwnerReferences: []metav1.OwnerReference{cr.AsOwner()}, }, Spec: appsv1.StatefulSetSpec{ Selector: &metav1.LabelSelector{ - MatchLabels: cr.SelectorLabels(vmv1beta1.ClusterComponentStorage), + MatchLabels: b.SelectorLabels(), }, UpdateStrategy: appsv1.StatefulSetUpdateStrategy{ Type: cr.Spec.VMStorage.RollingUpdateStrategy, @@ -1024,7 +1096,9 @@ func makePodSpecForVMStorage(ctx context.Context, cr *vmv1beta1.VMCluster) (*cor if cfg.EnableTCP6 { args = append(args, "-enableTCP6") } - if cr.Spec.RetentionPeriod != "" { + if rp := cr.Spec.VMStorage.RetentionPeriod; rp != "" { + args = append(args, fmt.Sprintf("-retentionPeriod=%s", rp)) + } else if cr.Spec.RetentionPeriod != "" { args = append(args, fmt.Sprintf("-retentionPeriod=%s", cr.Spec.RetentionPeriod)) } @@ -1232,23 +1306,23 @@ func makePodSpecForVMStorage(ctx context.Context, cr *vmv1beta1.VMCluster) (*cor return vmStoragePodSpec, nil } -func createOrUpdatePodDisruptionBudgetForVMStorage(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster) error { - b := build.NewChildBuilder(cr, vmv1beta1.ClusterComponentStorage) +func createOrUpdatePodDisruptionBudgetForVMStorage(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster, poolName string) error { + b := build.NewPoolBuilder(cr, vmv1beta1.ClusterComponentStorage, poolName) pdb := build.PodDisruptionBudget(b, cr.Spec.VMStorage.PodDisruptionBudget) var prevPDB *policyv1.PodDisruptionBudget if prevCR != nil && prevCR.Spec.VMStorage.PodDisruptionBudget != nil { - b = build.NewChildBuilder(prevCR, vmv1beta1.ClusterComponentStorage) + b = build.NewPoolBuilder(prevCR, vmv1beta1.ClusterComponentStorage, poolName) prevPDB = build.PodDisruptionBudget(b, prevCR.Spec.VMStorage.PodDisruptionBudget) } owner := cr.AsOwner() return reconcile.PDB(ctx, rclient, pdb, prevPDB, &owner) } -func createOrUpdateVMInsertHPA(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster) error { +func createOrUpdateVMInsertHPA(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster, poolName string) error { if cr.Spec.VMInsert.HPA == nil { return nil } - b := build.NewChildBuilder(cr, vmv1beta1.ClusterComponentInsert) + b := build.NewPoolBuilder(cr, vmv1beta1.ClusterComponentInsert, poolName) targetRef := autoscalingv2.CrossVersionObjectReference{ Name: b.PrefixedName(), Kind: "Deployment", @@ -1257,7 +1331,7 @@ func createOrUpdateVMInsertHPA(ctx context.Context, rclient client.Client, cr, p newHPA := build.HPA(b, targetRef, cr.Spec.VMInsert.HPA) var prevHPA *autoscalingv2.HorizontalPodAutoscaler if prevCR != nil && prevCR.Spec.VMInsert.HPA != nil { - b = build.NewChildBuilder(prevCR, vmv1beta1.ClusterComponentInsert) + b = build.NewPoolBuilder(prevCR, vmv1beta1.ClusterComponentInsert, poolName) prevHPA = build.HPA(b, targetRef, prevCR.Spec.VMInsert.HPA) } owner := cr.AsOwner() @@ -1284,12 +1358,12 @@ func createOrUpdateVMSelectHPA(ctx context.Context, rclient client.Client, cr, p return reconcile.HPA(ctx, rclient, defaultHPA, prevHPA, &owner) } -func createOrUpdateVMStorageHPA(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster) error { +func createOrUpdateVMStorageHPA(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster, poolName string) error { hpa := cr.Spec.VMStorage.HPA if hpa == nil { return nil } - b := build.NewChildBuilder(cr, vmv1beta1.ClusterComponentStorage) + b := build.NewPoolBuilder(cr, vmv1beta1.ClusterComponentStorage, poolName) targetRef := autoscalingv2.CrossVersionObjectReference{ Name: b.PrefixedName(), Kind: "StatefulSet", @@ -1298,18 +1372,18 @@ func createOrUpdateVMStorageHPA(ctx context.Context, rclient client.Client, cr, defaultHPA := build.HPA(b, targetRef, hpa) var prevHPA *autoscalingv2.HorizontalPodAutoscaler if prevCR != nil && prevCR.Spec.VMStorage.HPA != nil { - b = build.NewChildBuilder(prevCR, vmv1beta1.ClusterComponentStorage) + b = build.NewPoolBuilder(prevCR, vmv1beta1.ClusterComponentStorage, poolName) prevHPA = build.HPA(b, targetRef, prevCR.Spec.VMStorage.HPA) } owner := cr.AsOwner() return reconcile.HPA(ctx, rclient, defaultHPA, prevHPA, &owner) } -func createOrUpdateVMInsertVPA(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster) error { +func createOrUpdateVMInsertVPA(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster, poolName string) error { if cr.Spec.VMInsert.VPA == nil { return nil } - b := build.NewChildBuilder(cr, vmv1beta1.ClusterComponentInsert) + b := build.NewPoolBuilder(cr, vmv1beta1.ClusterComponentInsert, poolName) targetRef := autoscalingv1.CrossVersionObjectReference{ Name: b.PrefixedName(), Kind: "Deployment", @@ -1318,7 +1392,7 @@ func createOrUpdateVMInsertVPA(ctx context.Context, rclient client.Client, cr, p newVPA := build.VPA(b, targetRef, cr.Spec.VMInsert.VPA) var prevVPA *vpav1.VerticalPodAutoscaler if prevCR != nil && prevCR.Spec.VMInsert != nil && prevCR.Spec.VMInsert.VPA != nil { - b = build.NewChildBuilder(prevCR, vmv1beta1.ClusterComponentInsert) + b = build.NewPoolBuilder(prevCR, vmv1beta1.ClusterComponentInsert, poolName) prevVPA = build.VPA(b, targetRef, prevCR.Spec.VMInsert.VPA) } owner := cr.AsOwner() @@ -1345,12 +1419,12 @@ func createOrUpdateVMSelectVPA(ctx context.Context, rclient client.Client, cr, p return reconcile.VPA(ctx, rclient, newVPA, prevVPA, &owner) } -func createOrUpdateVMStorageVPA(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster) error { +func createOrUpdateVMStorageVPA(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster, poolName string) error { vpa := cr.Spec.VMStorage.VPA if vpa == nil { return nil } - b := build.NewChildBuilder(cr, vmv1beta1.ClusterComponentStorage) + b := build.NewPoolBuilder(cr, vmv1beta1.ClusterComponentStorage, poolName) targetRef := autoscalingv1.CrossVersionObjectReference{ Name: b.PrefixedName(), Kind: "StatefulSet", @@ -1359,7 +1433,7 @@ func createOrUpdateVMStorageVPA(ctx context.Context, rclient client.Client, cr, newVPA := build.VPA(b, targetRef, vpa) var prevVPA *vpav1.VerticalPodAutoscaler if prevCR != nil && prevCR.Spec.VMStorage != nil && prevCR.Spec.VMStorage.VPA != nil { - b = build.NewChildBuilder(prevCR, vmv1beta1.ClusterComponentStorage) + b = build.NewPoolBuilder(prevCR, vmv1beta1.ClusterComponentStorage, poolName) prevVPA = build.VPA(b, targetRef, prevCR.Spec.VMStorage.VPA) } owner := cr.AsOwner() @@ -1372,31 +1446,62 @@ func deleteOrphaned(ctx context.Context, rclient client.Client, cr *vmv1beta1.VM newInsert := cr.Spec.VMInsert newLB := cr.Spec.RequestsLoadBalancer + hasPools := len(cr.Spec.Pools) > 0 + hasPoolInsert := false + for _, p := range cr.Spec.Pools { + if p.VMInsert != nil { + hasPoolInsert = true + break + } + } + cc := finalize.NewChildCleaner() - if newStorage == nil { - if err := finalize.OnStorageDelete(ctx, rclient, cr, true); err != nil { - return fmt.Errorf("cannot remove orphaned storage resources: %w", err) + + keepStorageResources := func(name string, storage *vmv1beta1.VMStorage) { + cc.KeepService(name) + if storage.ServiceSpec != nil && !storage.ServiceSpec.UseAsDefault { + cc.KeepService(storage.ServiceSpec.NameOrDefault(name)) } - } else { - commonName := cr.PrefixedName(vmv1beta1.ClusterComponentStorage) - if newStorage.PodDisruptionBudget != nil { - cc.KeepPDB(commonName) + if !ptr.Deref(storage.DisableSelfServiceScrape, false) { + cc.KeepScrape(name) } - if newStorage.HPA != nil { - cc.KeepHPA(commonName) + if storage.PodDisruptionBudget != nil { + cc.KeepPDB(name) } - if newStorage.VPA != nil { - cc.KeepVPA(commonName) + if storage.HPA != nil { + cc.KeepHPA(name) } - if !ptr.Deref(newStorage.DisableSelfServiceScrape, false) { - cc.KeepScrape(commonName) + if storage.VPA != nil { + cc.KeepVPA(name) } - cc.KeepService(commonName) - if newStorage.ServiceSpec != nil && !newStorage.ServiceSpec.UseAsDefault { - cc.KeepService(newStorage.ServiceSpec.NameOrDefault(commonName)) + } + keepInsertResources := func(name, scrapeName string, insert *vmv1beta1.VMInsert) { + cc.KeepService(name) + if insert.ServiceSpec != nil && !insert.ServiceSpec.UseAsDefault { + cc.KeepService(insert.ServiceSpec.NameOrDefault(name)) + } + if !ptr.Deref(insert.DisableSelfServiceScrape, false) { + cc.KeepScrape(scrapeName) + } + if insert.PodDisruptionBudget != nil { + cc.KeepPDB(name) + } + if insert.HPA != nil { + cc.KeepHPA(name) + } + if insert.VPA != nil { + cc.KeepVPA(name) } } + if newStorage == nil && !hasPools { + if err := finalize.OnStorageDelete(ctx, rclient, cr, true); err != nil { + return fmt.Errorf("cannot remove orphaned storage resources: %w", err) + } + } else if newStorage != nil { + keepStorageResources(cr.PrefixedName(vmv1beta1.ClusterComponentStorage), newStorage) + } + if newSelect == nil { if err := finalize.OnSelectDelete(ctx, rclient, cr, true); err != nil { return fmt.Errorf("cannot remove orphaned select resources: %w", err) @@ -1426,33 +1531,18 @@ func deleteOrphaned(ctx context.Context, rclient client.Client, cr *vmv1beta1.VM } } - if newInsert == nil { + if newInsert == nil && !hasPoolInsert { if err := finalize.OnInsertDelete(ctx, rclient, cr, true); err != nil { return fmt.Errorf("cannot remove orphaned insert resources: %w", err) } - } else { + } else if newInsert != nil { commonName := cr.PrefixedName(vmv1beta1.ClusterComponentInsert) - if newInsert.PodDisruptionBudget != nil { - cc.KeepPDB(commonName) - } - if newInsert.HPA != nil { - cc.KeepHPA(commonName) - } - if newInsert.VPA != nil { - cc.KeepVPA(commonName) - } - cc.KeepService(commonName) - if newInsert.ServiceSpec != nil && !newInsert.ServiceSpec.UseAsDefault { - cc.KeepService(newInsert.ServiceSpec.NameOrDefault(commonName)) - } scrapeName := commonName if newLB.Enabled && !newLB.DisableInsertBalancing { scrapeName = cr.PrefixedInternalName(vmv1beta1.ClusterComponentInsert) cc.KeepService(scrapeName) } - if !ptr.Deref(newInsert.DisableSelfServiceScrape, false) { - cc.KeepScrape(scrapeName) - } + keepInsertResources(commonName, scrapeName, newInsert) } if newLB.Enabled { commonName := cr.PrefixedName(vmv1beta1.ClusterComponentBalancer) @@ -1482,6 +1572,47 @@ func deleteOrphaned(ctx context.Context, rclient client.Client, cr *vmv1beta1.VM return fmt.Errorf("cannot remove serviceaccount: %w", err) } } + + // Pool resource cleanup: keep active pool services and remove orphaned pool StatefulSets/Deployments. + keepStorageSTSNames := sets.New[string]() + keepInsertDeploymentNames := sets.New[string]() + if newStorage != nil { + keepStorageSTSNames.Insert(cr.PrefixedName(vmv1beta1.ClusterComponentStorage)) + } + if newInsert != nil { + keepInsertDeploymentNames.Insert(cr.PrefixedName(vmv1beta1.ClusterComponentInsert)) + } + for _, pool := range cr.Spec.Pools { + poolStorageName := cr.PoolPrefixedName(vmv1beta1.ClusterComponentStorage, pool.Name) + keepStorageSTSNames.Insert(poolStorageName) + poolStorageItem, err := poolStorage(cr, pool) + if err != nil { + return fmt.Errorf("pool %q: cannot resolve storage: %w", pool.Name, err) + } + if poolStorageItem != nil { + keepStorageResources(poolStorageName, poolStorageItem) + } + if pool.VMInsert != nil { + poolInsertName := cr.PoolPrefixedName(vmv1beta1.ClusterComponentInsert, pool.Name) + keepInsertDeploymentNames.Insert(poolInsertName) + poolInsertItem, err := poolInsert(cr, pool) + if err != nil { + return fmt.Errorf("pool %q: cannot resolve insert: %w", pool.Name, err) + } + if poolInsertItem != nil { + keepInsertResources(poolInsertName, poolInsertName, poolInsertItem) + } + } + } + storageBuilder := build.NewChildBuilder(cr, vmv1beta1.ClusterComponentStorage) + if err := finalize.RemoveOrphanedSTSs(ctx, rclient, storageBuilder, keepStorageSTSNames, true); err != nil { + return fmt.Errorf("cannot remove orphaned pool storage StatefulSets: %w", err) + } + insertBuilder := build.NewChildBuilder(cr, vmv1beta1.ClusterComponentInsert) + if err := finalize.RemoveOrphanedDeployments(ctx, rclient, insertBuilder, keepInsertDeploymentNames, true); err != nil { + return fmt.Errorf("cannot remove orphaned pool insert Deployments: %w", err) + } + return cc.RemoveOrphaned(ctx, rclient, cr) } diff --git a/internal/controller/operator/factory/vmcluster/vmcluster_pools.go b/internal/controller/operator/factory/vmcluster/vmcluster_pools.go new file mode 100644 index 000000000..11713a57e --- /dev/null +++ b/internal/controller/operator/factory/vmcluster/vmcluster_pools.go @@ -0,0 +1,173 @@ +package vmcluster + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + "sigs.k8s.io/controller-runtime/pkg/client" + + vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1" +) + +// createOrUpdatePool reconciles all resources for a single pool in sequence: +// vmstorage StatefulSet → vmstorage Service → vminsert Deployment + Service (if dedicated). +func createOrUpdatePool(ctx context.Context, rclient client.Client, cr, prevCR *vmv1beta1.VMCluster, pool vmv1beta1.VMClusterPool, owner metav1.OwnerReference) error { + view, err := buildPoolView(cr, pool) + if err != nil { + return err + } + var prevView *vmv1beta1.VMCluster + if prevCR != nil { + if prevPool, ok := findPool(prevCR.Spec.Pools, pool.Name); ok { + prevView, err = buildPoolView(prevCR, prevPool) + if err != nil { + return err + } + } + } + + if view.Spec.VMStorage.PodDisruptionBudget != nil { + if err := createOrUpdatePodDisruptionBudgetForVMStorage(ctx, rclient, view, prevView, pool.Name); err != nil { + return fmt.Errorf("vmstorage pdb: %w", err) + } + } + if err := createOrUpdateVMStorage(ctx, rclient, view, prevView, pool.Name, owner); err != nil { + return fmt.Errorf("vmstorage: %w", err) + } + if err := createOrUpdateVMStorageService(ctx, rclient, view, prevView, owner, pool.Name); err != nil { + return fmt.Errorf("vmstorage service: %w", err) + } + if err := createOrUpdateVMStorageHPA(ctx, rclient, view, prevView, pool.Name); err != nil { + return fmt.Errorf("vmstorage hpa: %w", err) + } + if err := createOrUpdateVMStorageVPA(ctx, rclient, view, prevView, pool.Name); err != nil { + return fmt.Errorf("vmstorage vpa: %w", err) + } + + if pool.VMInsert != nil { + if view.Spec.VMInsert.PodDisruptionBudget != nil { + if err := createOrUpdatePodDisruptionBudgetForVMInsert(ctx, rclient, view, prevView, pool.Name); err != nil { + return fmt.Errorf("vminsert pdb: %w", err) + } + } + if err := createOrUpdateVMInsert(ctx, rclient, view, prevView, pool.Name, owner); err != nil { + return fmt.Errorf("vminsert: %w", err) + } + if err := createOrUpdateVMInsertService(ctx, rclient, view, prevView, owner, pool.Name); err != nil { + return fmt.Errorf("vminsert service: %w", err) + } + if err := createOrUpdateVMInsertHPA(ctx, rclient, view, prevView, pool.Name); err != nil { + return fmt.Errorf("vminsert hpa: %w", err) + } + if err := createOrUpdateVMInsertVPA(ctx, rclient, view, prevView, pool.Name); err != nil { + return fmt.Errorf("vminsert vpa: %w", err) + } + } + return nil +} + +// buildPoolView returns a deep copy of cr with its VMStorage, VMInsert, and +// RetentionPeriod substituted with the pool's merged values. +// view.Name stays equal to cr.Name so all label methods produce the correct instance label. +// Pool-specific naming and labels are handled by build.NewPoolBuilder inside the factory functions. +func buildPoolView(cr *vmv1beta1.VMCluster, pool vmv1beta1.VMClusterPool) (*vmv1beta1.VMCluster, error) { + view := cr.DeepCopy() + + storage, err := poolStorage(cr, pool) + if err != nil { + return nil, fmt.Errorf("cannot resolve vmstorage: %w", err) + } + view.Spec.VMStorage = storage + if storage != nil && storage.RetentionPeriod != "" { + view.Spec.RetentionPeriod = storage.RetentionPeriod + } + + if pool.VMInsert != nil { + insert, err := poolInsert(cr, pool) + if err != nil { + return nil, fmt.Errorf("cannot resolve vminsert: %w", err) + } + view.Spec.VMInsert = insert + } else { + view.Spec.VMInsert = nil + } + + view.Spec.VMSelect = nil + view.Spec.Pools = nil + return view, nil +} + +// poolStorage merges the pool's vmstorage over the top-level base using MergeDeep. +// Pool fields take precedence; absent pool fields fall through to the base. +func poolStorage(cr *vmv1beta1.VMCluster, pool vmv1beta1.VMClusterPool) (*vmv1beta1.VMStorage, error) { + base := cr.Spec.VMStorage + if pool.VMStorage == nil { + if base == nil { + return nil, nil + } + return base.DeepCopy(), nil + } + merged := pool.VMStorage.DeepCopy() + if base != nil { + // reverse=true: merged (pool) fields win; base fills absent fields. + if err := vmv1beta1.MergeDeep(merged, base, true); err != nil { + return nil, err + } + } + return merged, nil +} + +// poolInsert merges the pool's vminsert over the top-level base using MergeDeep. +func poolInsert(cr *vmv1beta1.VMCluster, pool vmv1beta1.VMClusterPool) (*vmv1beta1.VMInsert, error) { + if pool.VMInsert == nil { + return nil, nil + } + base := cr.Spec.VMInsert + merged := pool.VMInsert.DeepCopy() + if base != nil { + if err := vmv1beta1.MergeDeep(merged, base, true); err != nil { + return nil, err + } + } + return merged, nil +} + +// storageNodeIDs returns available storage node IDs for the given VMStorage spec, +// skipping nodes under maintenance for the given component kind. +func storageNodeIDs(storage *vmv1beta1.VMStorage, kind vmv1beta1.ClusterComponent) []int32 { + if storage == nil || (storage.ReplicaCount == nil && storage.HPA == nil) { + return nil + } + maintenanceNodes := sets.New[int32]() + switch kind { + case vmv1beta1.ClusterComponentSelect: + maintenanceNodes.Insert(storage.MaintenanceSelectNodeIDs...) + case vmv1beta1.ClusterComponentInsert: + maintenanceNodes.Insert(storage.MaintenanceInsertNodeIDs...) + } + var replicaCount int32 + if storage.ReplicaCount != nil { + replicaCount = *storage.ReplicaCount + } else if storage.HPA != nil { + replicaCount = storage.HPA.GetMinReplicas() + } + var result []int32 + for i := int32(0); i < replicaCount; i++ { + if !maintenanceNodes.Has(i) { + result = append(result, i) + } + } + return result +} + +// findPool returns the pool with the given name and whether it was found. +func findPool(pools []vmv1beta1.VMClusterPool, name string) (vmv1beta1.VMClusterPool, bool) { + for _, p := range pools { + if p.Name == name { + return p, true + } + } + return vmv1beta1.VMClusterPool{}, false +} diff --git a/internal/controller/operator/factory/vmcluster/vmcluster_test.go b/internal/controller/operator/factory/vmcluster/vmcluster_test.go index 13cdabe14..1010ad525 100644 --- a/internal/controller/operator/factory/vmcluster/vmcluster_test.go +++ b/internal/controller/operator/factory/vmcluster/vmcluster_test.go @@ -858,6 +858,127 @@ func TestCreateOrUpdate(t *testing.T) { "managed-by": "vm-operator", }, svc.Labels) }}) + + // pools: two pools with shared vminsert — pool STSes get pool label, instance label stays the cluster name, top-level vmstorage not created + f(opts{ + cr: &vmv1beta1.VMCluster{ + ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "cluster-1"}, + Spec: vmv1beta1.VMClusterSpec{ + RetentionPeriod: "1", + VMStorage: &vmv1beta1.VMStorage{ + CommonAppsParams: vmv1beta1.CommonAppsParams{ReplicaCount: ptr.To(int32(1))}, + }, + VMSelect: &vmv1beta1.VMSelect{ + CommonAppsParams: vmv1beta1.CommonAppsParams{ReplicaCount: ptr.To(int32(1))}, + }, + VMInsert: &vmv1beta1.VMInsert{ + CommonAppsParams: vmv1beta1.CommonAppsParams{ReplicaCount: ptr.To(int32(1))}, + }, + Pools: []vmv1beta1.VMClusterPool{ + {Name: "hot"}, + {Name: "cold"}, + }, + }, + }, + validate: func(ctx context.Context, rclient client.Client, cr *vmv1beta1.VMCluster) { + for _, poolName := range []string{"hot", "cold"} { + stsName := cr.PoolPrefixedName(vmv1beta1.ClusterComponentStorage, poolName) + var sts appsv1.StatefulSet + assert.NoError(t, rclient.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: stsName}, &sts), "pool STS %s should exist", stsName) + // instance label must be the cluster name, not cluster-pool + assert.Equal(t, cr.Name, sts.Labels["app.kubernetes.io/instance"], "instance label for pool %s", poolName) + // STS selector must include the pool label so per-pool selectors are disjoint + assert.Equal(t, poolName, sts.Spec.Selector.MatchLabels["app.kubernetes.io/pool"], "selector pool label for pool %s", poolName) + // pod template must also carry the pool label so the STS selector matches its pods + assert.Equal(t, poolName, sts.Spec.Template.Labels["app.kubernetes.io/pool"], "pod template pool label for pool %s", poolName) + } + // top-level vmstorage must NOT be created when pools are defined + var topSts appsv1.StatefulSet + err := rclient.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: cr.PrefixedName(vmv1beta1.ClusterComponentStorage)}, &topSts) + assert.True(t, k8serrors.IsNotFound(err), "top-level vmstorage STS must not exist when pools are defined") + // shared vminsert must still be created (no pool has a dedicated insert) + var dep appsv1.Deployment + assert.NoError(t, rclient.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: cr.PrefixedName(vmv1beta1.ClusterComponentInsert)}, &dep)) + }, + }) + + // pools: pool with dedicated vminsert — pool insert created, top-level vminsert skipped + f(opts{ + cr: &vmv1beta1.VMCluster{ + ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "cluster-1"}, + Spec: vmv1beta1.VMClusterSpec{ + RetentionPeriod: "1", + VMStorage: &vmv1beta1.VMStorage{ + CommonAppsParams: vmv1beta1.CommonAppsParams{ReplicaCount: ptr.To(int32(1))}, + }, + VMSelect: &vmv1beta1.VMSelect{ + CommonAppsParams: vmv1beta1.CommonAppsParams{ReplicaCount: ptr.To(int32(1))}, + }, + VMInsert: &vmv1beta1.VMInsert{ + CommonAppsParams: vmv1beta1.CommonAppsParams{ReplicaCount: ptr.To(int32(1))}, + }, + Pools: []vmv1beta1.VMClusterPool{ + { + Name: "hot", + VMInsert: &vmv1beta1.VMInsert{ + CommonAppsParams: vmv1beta1.CommonAppsParams{ReplicaCount: ptr.To(int32(1))}, + }, + }, + }, + }, + }, + validate: func(ctx context.Context, rclient client.Client, cr *vmv1beta1.VMCluster) { + // pool insert must exist with pool-scoped name + poolInsertName := cr.PoolPrefixedName(vmv1beta1.ClusterComponentInsert, "hot") + var poolDep appsv1.Deployment + assert.NoError(t, rclient.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: poolInsertName}, &poolDep)) + assert.Equal(t, cr.Name, poolDep.Labels["app.kubernetes.io/instance"]) + assert.Equal(t, "hot", poolDep.Labels["app.kubernetes.io/pool"]) + // top-level vminsert must NOT be created when any pool has its own insert + var topDep appsv1.Deployment + err := rclient.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: cr.PrefixedName(vmv1beta1.ClusterComponentInsert)}, &topDep) + assert.True(t, k8serrors.IsNotFound(err), "top-level vminsert must not exist when a pool has a dedicated insert") + }, + }) + + // pools: VMStorage.RetentionPeriod overrides cluster-level RetentionPeriod in the generated args + f(opts{ + cr: &vmv1beta1.VMCluster{ + ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "cluster-1"}, + Spec: vmv1beta1.VMClusterSpec{ + RetentionPeriod: "1", + VMStorage: &vmv1beta1.VMStorage{ + RetentionPeriod: "90d", + CommonAppsParams: vmv1beta1.CommonAppsParams{ReplicaCount: ptr.To(int32(1))}, + }, + VMSelect: &vmv1beta1.VMSelect{ + CommonAppsParams: vmv1beta1.CommonAppsParams{ReplicaCount: ptr.To(int32(0))}, + }, + }, + }, + validate: func(ctx context.Context, rclient client.Client, cr *vmv1beta1.VMCluster) { + var sts appsv1.StatefulSet + assert.NoError(t, rclient.Get(ctx, types.NamespacedName{Namespace: cr.Namespace, Name: cr.PrefixedName(vmv1beta1.ClusterComponentStorage)}, &sts)) + var storageArgs []string + for _, c := range sts.Spec.Template.Spec.Containers { + if c.Name == "vmstorage" { + storageArgs = c.Args + } + } + hasRetention90d := false + hasRetention1 := false + for _, a := range storageArgs { + if a == "-retentionPeriod=90d" { + hasRetention90d = true + } + if a == "-retentionPeriod=1" { + hasRetention1 = true + } + } + assert.True(t, hasRetention90d, "VMStorage.RetentionPeriod should override cluster-level: got args %v", storageArgs) + assert.False(t, hasRetention1, "cluster-level RetentionPeriod should be overridden: got args %v", storageArgs) + }, + }) } func TestCreatOrUpdateClusterServices(t *testing.T) { @@ -1961,6 +2082,45 @@ func TestVMClusterDiscoveryArgs(t *testing.T) { }, ) + // pools: vmselect gets pool-grouped storage nodes, shared vminsert gets plain addresses + f(&vmv1beta1.VMCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: vmv1beta1.VMClusterSpec{ + VMStorage: &vmv1beta1.VMStorage{ + CommonAppsParams: vmv1beta1.CommonAppsParams{ReplicaCount: ptr.To(int32(2))}, + }, + VMSelect: &vmv1beta1.VMSelect{ + CommonAppsParams: vmv1beta1.CommonAppsParams{ReplicaCount: ptr.To(int32(1))}, + }, + VMInsert: &vmv1beta1.VMInsert{ + CommonAppsParams: vmv1beta1.CommonAppsParams{ReplicaCount: ptr.To(int32(1))}, + }, + Pools: []vmv1beta1.VMClusterPool{ + {Name: "hot", VMStorage: &vmv1beta1.VMStorage{CommonAppsParams: vmv1beta1.CommonAppsParams{ReplicaCount: ptr.To(int32(2))}}}, + {Name: "cold", VMStorage: &vmv1beta1.VMStorage{CommonAppsParams: vmv1beta1.CommonAppsParams{ReplicaCount: ptr.To(int32(1))}}}, + }, + }, + }, + func(t *testing.T, args []string) { + // vmselect: each pool appears as a named storage group (poolName/addr) + assert.True(t, hasArg(args, "-storageNode=hot/"), "vmselect: expected hot pool storageNode, got %v", args) + assert.True(t, hasArg(args, "-storageNode=cold/"), "vmselect: expected cold pool storageNode, got %v", args) + // no top-level (ungrouped) storage node when pools are defined + for _, a := range args { + if strings.HasPrefix(a, "-storageNode=") { + val := strings.TrimPrefix(a, "-storageNode=") + assert.True(t, strings.Contains(val, "/"), "vmselect: expected all storageNodes to be pool-grouped, got %q", a) + } + } + }, + func(t *testing.T, args []string) { + // shared vminsert: pool storage nodes are plain addresses without pool prefix + assert.True(t, hasArg(args, "-storageNode="), "vminsert: expected storageNode flag, got %v", args) + assert.False(t, hasArg(args, "-storageNode=hot/"), "vminsert: should not have pool-grouped addresses, got %v", args) + assert.False(t, hasArg(args, "-storageNode=cold/"), "vminsert: should not have pool-grouped addresses, got %v", args) + }, + ) + // component-level discovery overrides interval and filter f(&vmv1beta1.VMCluster{ ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, diff --git a/internal/controller/operator/factory/vmdistributed/util.go b/internal/controller/operator/factory/vmdistributed/util.go index 3b7a058d7..b23058a45 100644 --- a/internal/controller/operator/factory/vmdistributed/util.go +++ b/internal/controller/operator/factory/vmdistributed/util.go @@ -9,7 +9,7 @@ import ( "strings" vmv1alpha1 "github.com/VictoriaMetrics/operator/api/operator/v1alpha1" - "github.com/VictoriaMetrics/operator/internal/controller/operator/factory/build" + vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1" "github.com/VictoriaMetrics/operator/internal/controller/operator/factory/k8stools" ) @@ -253,7 +253,7 @@ func mergeSpecs[T any](a, b *T, name string) (*T, error) { } // Apply cluster-specific override if it exist - if err := build.MergeDeep(merged, b, false); err != nil { + if err := vmv1beta1.MergeDeep(merged, b, false); err != nil { return nil, fmt.Errorf("failed to merge spec: %w", err) } return merged, nil diff --git a/internal/converter/converter.go b/internal/converter/converter.go index a6cafc3fe..1e0265816 100644 --- a/internal/converter/converter.go +++ b/internal/converter/converter.go @@ -14,7 +14,6 @@ import ( vmv1 "github.com/VictoriaMetrics/operator/api/operator/v1" vmv1beta1 "github.com/VictoriaMetrics/operator/api/operator/v1beta1" - "github.com/VictoriaMetrics/operator/internal/controller/operator/factory/build" ) // VMSingleHelmValues represents values from VictoriaMetrics single helm chart @@ -357,7 +356,7 @@ func MergeValues(base, override []byte) ([]byte, error) { if baseMap == nil { return k8syaml.Marshal(overrideMap) } - if err := build.MergeDeep(&baseMap, &overrideMap, false); err != nil { + if err := vmv1beta1.MergeDeep(&baseMap, &overrideMap, false); err != nil { return nil, fmt.Errorf("cannot merge values: %w", err) } return k8syaml.Marshal(baseMap)