Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 154 additions & 47 deletions api/operator/v1beta1/vmcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,18 @@ type VMClusterSpec struct {
// See https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#automatic-vmstorage-discovery
// +optional
Discovery *VMClusterDiscovery `json:"discovery,omitempty"`

// Pools defines named groups of vmstorage (and optionally vminsert) components.
// Each pool gets its own StatefulSet and headless Service named <component>-<cluster>-<pool>.
// Top-level vmstorage and vminsert specs act as defaults; pool specs override them field-by-field.
// vmselect queries all pools using the pool name as a storage group name (-storageNode=<pool>/<addr>).
// When pools are defined the top-level vmstorage is not deployed; pools replace it entirely.
// The top-level vminsert is deployed as a shared insert across all pools only when no pool
// defines its own vminsert; as soon as any pool has a dedicated vminsert the top-level one is skipped.
// +optional
// +listType=map
// +listMapKey=name
Pools []VMClusterPool `json:"pools,omitempty"`
}

// VMClusterDiscovery configures automatic vmstorage node discovery for vminsert and vmselect.
Expand Down Expand Up @@ -128,7 +140,16 @@ func (d *VMClusterDiscovery) enabled() bool {
return d != nil && d.Enabled
}

func (d *VMClusterDiscovery) validate() error {
func (d *VMClusterDiscovery) validate(license *License) error {
if !d.enabled() {
return nil
}
if !license.IsProvided() {
return fmt.Errorf("discovery requires a valid license key, see https://docs.victoriametrics.com/victoriametrics/enterprise/")
}
if err := license.validate(); err != nil {
return err
}
if len(d.Filter) > 0 {
if _, err := regexp.Compile(d.Filter); err != nil {
return fmt.Errorf("discovery.filter is not a valid regexp: %w", err)
Expand Down Expand Up @@ -488,6 +509,11 @@ type VMStorage struct {
// it can be overwritten with component specific image.tag value.
// +optional
ComponentVersion string `json:"componentVersion,omitempty"`
// RetentionPeriod overrides the cluster-level retentionPeriod for this storage instance.
// Useful when using Pools to implement multi-retention setups.
// +optional
// +kubebuilder:validation:Pattern:="^[0-9]+(h|d|w|y)?$"
RetentionPeriod string `json:"retentionPeriod,omitempty"`
// PodMetadata configures Labels and Annotations which are propagated to the VMStorage pods.
PodMetadata *EmbeddedObjectMetadata `json:"podMetadata,omitempty"`
// LogFormat for VMStorage to be configured with.
Expand Down Expand Up @@ -707,6 +733,54 @@ func (cr *VMCluster) GetRemoteWriteURL() string {
return fmt.Sprintf("%s%s", insertURL, BuildPathWithPrefixFlag(cr.Spec.VMInsert.ExtraArgs, "/insert/multitenant/prometheus/api/v1/write"))
}

func (vms *VMStorage) validate(license *License, clusterRetentionPeriod string) error {
if vms.VMBackup != nil {
if err := vms.VMBackup.validate(license); err != nil {
return err
}
}
retention := clusterRetentionPeriod
if vms.RetentionPeriod != "" {
retention = vms.RetentionPeriod
}
if err := vms.RetentionFilters.validate(license, retention); err != nil {
return err
}
if vms.HPA != nil {
if vms.HPA.Behaviour != nil && vms.HPA.Behaviour.ScaleDown != nil {
return fmt.Errorf("scaledown HPA behavior is not supported")
}
if err := vms.HPA.Validate(); err != nil {
return err
}
}
if vms.VPA != nil {
if err := vms.VPA.Validate(); err != nil {
return err
}
}
if vms.RollingUpdateStrategyBehavior != nil {
if err := vms.RollingUpdateStrategyBehavior.Validate(); err != nil {
return err
}
}
return vms.Validate()
}

func (vmi *VMInsert) validate() error {
if vmi.HPA != nil {
if err := vmi.HPA.Validate(); err != nil {
return err
}
}
if vmi.VPA != nil {
if err := vmi.VPA.Validate(); err != nil {
return err
}
}
return vmi.Validate()
}

func (cr *VMCluster) Validate() error {
if MustSkipCRValidation(cr) {
return nil
Expand Down Expand Up @@ -742,17 +816,7 @@ func (cr *VMCluster) Validate() error {
if vmi.ServiceSpec != nil && vmi.ServiceSpec.Name == name {
return fmt.Errorf(".serviceSpec.Name cannot be equal to prefixed name=%q", name)
}
if vmi.HPA != nil {
if err := vmi.HPA.Validate(); err != nil {
return err
}
}
if vmi.VPA != nil {
if err := vmi.VPA.Validate(); err != nil {
return err
}
}
if err := vmi.Validate(); err != nil {
if err := vmi.validate(); err != nil {
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
return fmt.Errorf("vminsert: %w", err)
}
}
Expand All @@ -765,28 +829,7 @@ func (cr *VMCluster) Validate() error {
if vms.ServiceSpec != nil && vms.ServiceSpec.Name == name {
return fmt.Errorf(".serviceSpec.Name cannot be equal to prefixed name=%q", name)
}
if cr.Spec.VMStorage.VMBackup != nil {
if err := cr.Spec.VMStorage.VMBackup.validate(cr.Spec.License); err != nil {
return err
}
}
if err := vms.RetentionFilters.validate(cr.Spec.License, cr.Spec.RetentionPeriod); err != nil {
return err
}
if vms.RollingUpdateStrategyBehavior != nil {
if err := vms.RollingUpdateStrategyBehavior.Validate(); err != nil {
return fmt.Errorf("vmstorage: %w", err)
}
}
if vms.HPA != nil && vms.HPA.Behaviour != nil && vms.HPA.Behaviour.ScaleDown != nil {
return fmt.Errorf("vmstorage scaledown HPA behavior is not supported")
}
if vms.VPA != nil {
if err := vms.VPA.Validate(); err != nil {
return err
}
}
if err := vms.Validate(); err != nil {
if err := vms.validate(cr.Spec.License, cr.Spec.RetentionPeriod); err != nil {
return fmt.Errorf("vmstorage: %w", err)
}
}
Expand All @@ -807,34 +850,66 @@ func (cr *VMCluster) Validate() error {
if cr.Spec.VMSelect != nil {
vmselectDiscovery = cr.Spec.VMSelect.Discovery.OrDefault(cr.Spec.Discovery)
}
if vminsertDiscovery.enabled() || vmselectDiscovery.enabled() {
if !cr.Spec.License.IsProvided() {
return fmt.Errorf("discovery requires a valid license key, see https://docs.victoriametrics.com/victoriametrics/enterprise/")
}
if err := cr.Spec.License.validate(); err != nil {
return err
}
}
if vminsertDiscovery.enabled() {
if cr.Spec.VMStorage != nil && len(cr.Spec.VMStorage.MaintenanceInsertNodeIDs) > 0 {
return fmt.Errorf("maintenanceInsertNodeIDs cannot be used when vminsert discovery is enabled")
}
if err := vminsertDiscovery.validate(); err != nil {
return fmt.Errorf("vminsert: %w", err)
}
}
if err := vminsertDiscovery.validate(cr.Spec.License); err != nil {
return fmt.Errorf("vminsert: %w", err)
}
if vmselectDiscovery.enabled() {
if cr.Spec.VMStorage != nil && len(cr.Spec.VMStorage.MaintenanceSelectNodeIDs) > 0 {
return fmt.Errorf("maintenanceSelectNodeIDs cannot be used when vmselect discovery is enabled")
}
if err := vmselectDiscovery.validate(); err != nil {
return fmt.Errorf("vmselect: %w", err)
}
if err := vmselectDiscovery.validate(cr.Spec.License); err != nil {
return fmt.Errorf("vmselect: %w", err)
}

poolNames := make(map[string]struct{}, len(cr.Spec.Pools))
for i, pool := range cr.Spec.Pools {
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
if !poolNameRe.MatchString(pool.Name) {
return fmt.Errorf("pools[%d].name %q is invalid: must match ^[a-z0-9]([a-z0-9-]*[a-z0-9])?$", i, pool.Name)
}
if _, dup := poolNames[pool.Name]; dup {
return fmt.Errorf("pools[%d].name %q is duplicated", i, pool.Name)
}
poolNames[pool.Name] = struct{}{}
if pool.VMStorage != nil {
vms := pool.VMStorage.DeepCopy()
if cr.Spec.VMStorage != nil {
if err := MergeDeep(vms, cr.Spec.VMStorage, true); err != nil {
return fmt.Errorf("pools[%d] vmstorage merge: %w", i, err)
}
}
if err := vms.validate(cr.Spec.License, cr.Spec.RetentionPeriod); err != nil {
return fmt.Errorf("pools[%d] vmstorage: %w", i, err)
}
}
if pool.VMInsert != nil {
vmi := pool.VMInsert.DeepCopy()
if cr.Spec.VMInsert != nil {
if err := MergeDeep(vmi, cr.Spec.VMInsert, true); err != nil {
return fmt.Errorf("pools[%d] vminsert merge: %w", i, err)
}
}
if err := vmi.validate(); err != nil {
return fmt.Errorf("pools[%d] vminsert: %w", i, err)
}
poolDiscovery := vmi.Discovery.OrDefault(cr.Spec.Discovery)
if err := poolDiscovery.validate(cr.Spec.License); err != nil {
return fmt.Errorf("pools[%d] vminsert: %w", i, err)
}
}
}

return nil
}

// poolNameRe validates pool names: lowercase alphanumeric with interior hyphens.
var poolNameRe = regexp.MustCompile(`^[a-z0-9]([a-z0-9-]*[a-z0-9])?$`)

// AvailableStorageNodeIDs returns ids of the storage nodes for the provided component
func (cr *VMCluster) AvailableStorageNodeIDs(kind ClusterComponent) []int32 {
var result []int32
Expand Down Expand Up @@ -1184,3 +1259,35 @@ func (cr *VMAuthLoadBalancerSpec) UseTLS() bool {
func (cr *VMAuthLoadBalancerSpec) GetMetricsPath() string {
return BuildPathWithPrefixFlag(cr.ExtraArgs, metricsPath)
}

// VMClusterPool defines a named group of vmstorage (and optionally vminsert) components
// within a VMCluster. Each pool gets its own StatefulSet and headless Service.
// +k8s:openapi-gen=true
type VMClusterPool struct {
// Name is the unique identifier for this pool within the cluster.
// Used as a suffix for generated resource names and as a storage group name in vmselect.
// Must be a lowercase alphanumeric DNS label; hyphens allowed in the interior.
// +kubebuilder:validation:Pattern:="^[a-z0-9]([a-z0-9-]*[a-z0-9])?$"
// +kubebuilder:validation:MaxLength=32
Name string `json:"name"`
// VMStorage defines pool-specific vmstorage configuration.
// Each field overrides the corresponding field in the top-level vmstorage spec.
// Fields absent here inherit from the top-level vmstorage.
// RetentionPeriod on VMStorage overrides the cluster-level retentionPeriod for this pool.
// +optional
// +kubebuilder:validation:Schemaless
// +kubebuilder:pruning:PreserveUnknownFields
VMStorage *VMStorage `json:"vmstorage,omitempty"`
// VMInsert defines a dedicated vminsert for this pool.
// Each field overrides the corresponding field in the top-level vminsert spec.
// When nil, the top-level shared vminsert writes to this pool's storage nodes as well.
// +optional
// +kubebuilder:validation:Schemaless
// +kubebuilder:pruning:PreserveUnknownFields
VMInsert *VMInsert `json:"vminsert,omitempty"`
}

// PoolPrefixedName returns the Kubernetes resource name for the given component in a pool.
func (cr *VMCluster) PoolPrefixedName(kind ClusterComponent, poolName string) string {
return ClusterPrefixedName(kind, cr.Name, "vm", false) + "-" + poolName
}
56 changes: 56 additions & 0 deletions api/operator/v1beta1/vmextra_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1763,3 +1763,59 @@ func (bs *BytesString) String() string {
}
return string(*bs)
}

func mergeMapsRecursive(baseMap, overrideMap map[string]any) {
if len(overrideMap) == 0 {
return
}
for key, overrideValue := range overrideMap {
if baseVal, ok := baseMap[key]; ok {
if baseMapNested, isBaseMap := baseVal.(map[string]any); isBaseMap {
if overrideMapNested, isOverrideMap := overrideValue.(map[string]any); isOverrideMap {
mergeMapsRecursive(baseMapNested, overrideMapNested)
continue
}
}
}
baseMap[key] = overrideValue
}
}

// MergeDeep merges an override object into a base one using JSON round-trip.
// Fields present in the override overwrite corresponding fields in the base.
// When reverse is true the roles are swapped: base fills absent fields in override
// (useful when the override should win and the base provides defaults).
func MergeDeep[T comparable](base, override T, reverse bool) error {
var zero T
if override == zero {
return nil
}
baseJSON, err := json.Marshal(base)
if err != nil {
return fmt.Errorf("failed to marshal base spec: %w", err)
}
overrideJSON, err := json.Marshal(override)
if err != nil {
return fmt.Errorf("failed to marshal override spec: %w", err)
}
var baseMap map[string]any
if err := json.Unmarshal(baseJSON, &baseMap); err != nil {
return fmt.Errorf("failed to unmarshal base spec to map: %w", err)
}
var overrideMap map[string]any
if err := json.Unmarshal(overrideJSON, &overrideMap); err != nil {
return fmt.Errorf("failed to unmarshal override spec to map: %w", err)
}
if reverse {
baseMap, overrideMap = overrideMap, baseMap
}
mergeMapsRecursive(baseMap, overrideMap)
mergedJSON, err := json.Marshal(baseMap)
if err != nil {
return fmt.Errorf("failed to marshal merged spec map: %w", err)
}
if err := json.Unmarshal(mergedJSON, base); err != nil {
return fmt.Errorf("failed to unmarshal merged spec JSON: %w", err)
}
return nil
}
Loading
Loading