From 50e06e864a4c04f5291499ead20da7c90dacff2c Mon Sep 17 00:00:00 2001 From: Mikkel Oscar Lyderik Larsen Date: Fri, 16 Jan 2026 11:34:24 +0100 Subject: [PATCH] Refactor controller queue logic Signed-off-by: Mikkel Oscar Lyderik Larsen --- cmd/stackset-controller/main.go | 2 +- controller/stack_resources.go | 2 +- controller/stackset.go | 551 ++++++++++++++++++++++++++++++++ e2e/apply/rbac.yaml | 8 + go.mod | 2 +- pkg/clientset/unified.go | 4 +- 6 files changed, 564 insertions(+), 5 deletions(-) diff --git a/cmd/stackset-controller/main.go b/cmd/stackset-controller/main.go index a4b7bb17..5dbe2681 100644 --- a/cmd/stackset-controller/main.go +++ b/cmd/stackset-controller/main.go @@ -119,7 +119,7 @@ func main() { go handleSigterm(cancel) go serveMetrics(config.MetricsAddress) - err = controller.Run(ctx) + err = controller.Run2(ctx) if err != nil { cancel() log.Fatalf("Failed to run controller: %v", err) diff --git a/controller/stack_resources.go b/controller/stack_resources.go index d5d73aa4..b63e29fa 100644 --- a/controller/stack_resources.go +++ b/controller/stack_resources.go @@ -223,7 +223,7 @@ func (c *StackSetController) ReconcileStackIngress(ctx context.Context, stack *z apiv1.EventTypeNormal, "DeletedIngress", "Deleted Ingress %s", - existing.Namespace) + existing.Name) } return nil } diff --git a/controller/stackset.go b/controller/stackset.go index 76d41ae8..f05ed3bc 100644 --- a/controller/stackset.go +++ b/controller/stackset.go @@ -15,11 +15,18 @@ import ( "github.com/prometheus/client_golang/prometheus" log "github.com/sirupsen/logrus" rgv1 "github.com/szuecs/routegroup-client/apis/zalando.org/v1" + routegroupinformers "github.com/szuecs/routegroup-client/client/informers/externalversions" + rginformerv1 "github.com/szuecs/routegroup-client/client/informers/externalversions/zalando.org/v1" zv1 "github.com/zalando-incubator/stackset-controller/pkg/apis/zalando.org/v1" + stacksetinformers "github.com/zalando-incubator/stackset-controller/pkg/client/informers/externalversions" + stacksetinformerv1 "github.com/zalando-incubator/stackset-controller/pkg/client/informers/externalversions/zalando.org/v1" "github.com/zalando-incubator/stackset-controller/pkg/clientset" "github.com/zalando-incubator/stackset-controller/pkg/core" "github.com/zalando-incubator/stackset-controller/pkg/recorder" "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" + appsv1 "k8s.io/api/apps/v1" + autoscalingv2 "k8s.io/api/autoscaling/v2" v1 "k8s.io/api/core/v1" networking "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/api/equality" @@ -27,10 +34,18 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + appsinformerv1 "k8s.io/client-go/informers/apps/v1" + autoscalinginformerv2 "k8s.io/client-go/informers/autoscaling/v2" + coreinformer "k8s.io/client-go/informers/core/v1" + ingressinformerv1 "k8s.io/client-go/informers/networking/v1" "k8s.io/client-go/tools/cache" kube_record "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" ) const ( @@ -46,12 +61,26 @@ const ( var configurationResourceNameError = "ConfigurationResource name must be prefixed by Stack name. ConfigurationResource: %s, Stack: %s" +type controllerInformers struct { + stacksetInformer stacksetinformerv1.StackSetInformer + stackInformer stacksetinformerv1.StackInformer + ingressInformer ingressinformerv1.IngressInformer + routegroupInformer rginformerv1.RouteGroupInformer + deploymentInformer appsinformerv1.DeploymentInformer + serviceInformer coreinformer.ServiceInformer + hpaInformer autoscalinginformerv2.HorizontalPodAutoscalerInformer + configMapInformer coreinformer.ConfigMapInformer + secretInformer coreinformer.SecretInformer + pcsInformer stacksetinformerv1.PlatformCredentialsSetInformer +} + // StackSetController is the main controller. It watches for changes to // stackset resources and starts and maintains other controllers per // stackset resource. type StackSetController struct { logger *log.Entry client clientset.Interface + informers controllerInformers config StackSetConfig stacksetEvents chan stacksetEvent stacksetStore map[types.UID]zv1.StackSet @@ -60,6 +89,7 @@ type StackSetController struct { HealthReporter healthcheck.Handler now func() string sync.Mutex + queue workqueue.TypedRateLimitingInterface[types.NamespacedName] } type StackSetConfig struct { @@ -125,6 +155,11 @@ func NewStackSetController( metricsReporter: metricsReporter, HealthReporter: healthcheck.NewHandler(), now: now, + queue: workqueue.NewTypedRateLimitingQueue(workqueue.NewTypedMaxOfRateLimiter( + workqueue.NewTypedItemExponentialFailureRateLimiter[types.NamespacedName](5*time.Millisecond, 5*time.Second), + // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item) + &workqueue.TypedBucketRateLimiter[types.NamespacedName]{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + )), }, nil } @@ -234,6 +269,82 @@ func (c *StackSetController) Run(ctx context.Context) error { } } +func (c *StackSetController) Run2(ctx context.Context) error { + // defer runtime.HandleCrash() + defer c.queue.ShutDown() + + // TODO: health + // TODO: metricsReporter + c.logger.Info("Starting Controller") + + err := c.setupInformers(ctx) + if err != nil { + return fmt.Errorf("failed to setup informers: %v", err) + } + + for i := 0; i < c.config.ReconcileWorkers; i++ { + go wait.UntilWithContext(ctx, c.worker, time.Second) + } + + <-ctx.Done() + c.logger.Info("Shutting down controller") + return nil +} + +func (c *StackSetController) worker(ctx context.Context) { + for c.processNextItem(ctx) { + } +} + +func (c *StackSetController) processNextItem(ctx context.Context) bool { + key, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(key) + + err := c.syncHandler(ctx, key) + if err == nil { + c.queue.Forget(key) + return true + } + + // runtime.HandleError(fmt.Errorf("sync %q failed: %v", key, err)) + c.queue.AddRateLimited(key) + return true +} + +func (c *StackSetController) syncHandler(ctx context.Context, key types.NamespacedName) error { + obj, exists, err := c.informers.stacksetInformer.Informer().GetIndexer().GetByKey(key.String()) + if err != nil { + return fmt.Errorf("failed to get StackSet %s from informer: %v", key.String(), err) + } + if !exists { + return fmt.Errorf("StackSet %s not found in informer", key.String()) + } + + stackset, ok := obj.(*zv1.StackSet) + if !ok { + return fmt.Errorf("failed to cast object to StackSet for %s", key.String()) + } + + if !c.hasOwnership(stackset) { + return nil + } + + container, err := c.stacksetContainer(stackset.DeepCopy()) + if err != nil { + return err + } + + err = c.ReconcileStackSet(ctx, container) + if err != nil { + c.stacksetLogger(container).Errorf("unable to reconcile a stackset: %v", err) + return err + } + return nil +} + // collectResources collects resources for all stacksets at once and stores them per StackSet/Stack so that we don't // overload the API requests with unnecessary requests func (c *StackSetController) collectResources(ctx context.Context) (map[types.UID]*core.StackSetContainer, error) { @@ -567,6 +678,303 @@ func (c *StackSetController) collectPlatformCredentialsSet( return nil } +// collectResources collects resources for all stacksets at once and stores them per StackSet/Stack so that we don't +// overload the API requests with unnecessary requests +func (c *StackSetController) stacksetContainer(stackset *zv1.StackSet) (*core.StackSetContainer, error) { + key := types.NamespacedName{ + Namespace: stackset.Namespace, + Name: stackset.Name, + } + + fixupStackSetTypeMeta(stackset) + + reconciler := core.TrafficReconciler(&core.SimpleTrafficReconciler{}) + + // use prescaling logic if enabled with an annotation + if _, ok := stackset.Annotations[PrescaleStacksAnnotationKey]; ok { + resetDelay := defaultResetMinReplicasDelay + if resetDelayValue, ok := getResetMinReplicasDelay(stackset.Annotations); ok { + resetDelay = resetDelayValue + } + reconciler = &core.PrescalingTrafficReconciler{ + ResetHPAMinReplicasTimeout: resetDelay, + } + } + + stacksetContainer := core.NewContainer( + stackset, + reconciler, + c.config.BackendWeightsAnnotationKey, + c.config.ClusterDomains, + c.config.SyncIngressAnnotations, + ) + + labelSelector := labels.SelectorFromSet(labels.Set{(core.StacksetHeritageLabelKey): stackset.Name}) + err := c.addStacks(stacksetContainer, key, labelSelector) + if err != nil { + return nil, err + } + + err = c.addIngresses(stacksetContainer, key, labelSelector) + if err != nil { + return nil, err + } + + if c.config.RouteGroupSupportEnabled { + err = c.addRouteGroups(stacksetContainer, key, labelSelector) + if err != nil { + return nil, err + } + } + + err = c.addDeployments(stacksetContainer, key, labelSelector) + if err != nil { + return nil, err + } + + err = c.addServices(stacksetContainer, key, labelSelector) + if err != nil { + return nil, err + } + + err = c.addHPAs(stacksetContainer, key, labelSelector) + if err != nil { + return nil, err + } + + if c.config.ConfigMapSupportEnabled { + err = c.addConfigMaps(stacksetContainer, key, labelSelector) + if err != nil { + return nil, err + } + } + + if c.config.SecretSupportEnabled { + err = c.addSecrets(stacksetContainer, key, labelSelector) + if err != nil { + return nil, err + } + } + + if c.config.PcsSupportEnabled { + err = c.addPlatformCredentialsSet(stacksetContainer, key, labelSelector) + if err != nil { + return nil, err + } + } + + return stacksetContainer, nil +} + +func (c *StackSetController) addStacks(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error { + stacks, err := c.informers.stackInformer.Lister().Stacks(key.Namespace).List(labelSelector) + if err != nil { + return fmt.Errorf("failed to list Stacks for StackSet %s: %v", key.String(), err) + } + for _, stack := range stacks { + stack := stack.DeepCopy() + if uid, ok := getOwnerUID(stack.ObjectMeta); ok { + if uid == stacksetContainer.StackSet.UID { + fixupStackTypeMeta(stack) + + stacksetContainer.StackContainers[stack.UID] = &core.StackContainer{ + Stack: stack, + } + } + } + } + return nil +} + +func (c *StackSetController) addIngresses(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error { + ingresses, err := c.informers.ingressInformer.Lister().Ingresses(key.Namespace).List(labelSelector) + if err != nil { + return fmt.Errorf("failed to list Ingresses for StackSet %s: %v", key.String(), err) + } + for _, ingress := range ingresses { + ingress := ingress.DeepCopy() + if uid, ok := getOwnerUID(ingress.ObjectMeta); ok { + // stackset ingress + if uid == stacksetContainer.StackSet.UID { + stacksetContainer.Ingress = ingress + continue + } + + // stack ingress + if s, ok := stacksetContainer.StackContainers[uid]; ok { + if strings.HasSuffix( + ingress.ObjectMeta.Name, + core.SegmentSuffix, + ) { + // Traffic Segment + s.Resources.IngressSegment = ingress + } else { + s.Resources.Ingress = ingress + } + } + } + } + + return nil +} + +func (c *StackSetController) addRouteGroups(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error { + routegroups, err := c.informers.routegroupInformer.Lister().RouteGroups(key.Namespace).List(labelSelector) + if err != nil { + return fmt.Errorf("failed to list Ingresses for StackSet %s: %v", key.String(), err) + } + for _, routegroup := range routegroups { + routegroup := routegroup.DeepCopy() + if uid, ok := getOwnerUID(routegroup.ObjectMeta); ok { + // stackset routegroup + if uid == stacksetContainer.StackSet.UID { + stacksetContainer.RouteGroup = routegroup + continue + } + + // stack routegroup + if s, ok := stacksetContainer.StackContainers[uid]; ok { + if strings.HasSuffix( + routegroup.ObjectMeta.Name, + core.SegmentSuffix, + ) { + // Traffic Segment + s.Resources.RouteGroupSegment = routegroup + } else { + s.Resources.RouteGroup = routegroup + } + } + } + } + + return nil +} + +func (c *StackSetController) addDeployments(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error { + deployments, err := c.informers.deploymentInformer.Lister().Deployments(key.Namespace).List(labelSelector) + if err != nil { + return fmt.Errorf("failed to list Deployments for StackSet %s: %v", key.String(), err) + } + + for _, deployment := range deployments { + deployment := deployment.DeepCopy() + if uid, ok := getOwnerUID(deployment.ObjectMeta); ok { + if s, ok := stacksetContainer.StackContainers[uid]; ok { + s.Resources.Deployment = deployment + } + } + } + return nil +} + +func (c *StackSetController) addServices(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error { + services, err := c.informers.serviceInformer.Lister().Services(key.Namespace).List(labelSelector) + if err != nil { + return fmt.Errorf("failed to list Services for StackSet %s: %v", key.String(), err) + } + +Items: + for _, service := range services { + service := service.DeepCopy() + if uid, ok := getOwnerUID(service.ObjectMeta); ok { + if s, ok := stacksetContainer.StackContainers[uid]; ok { + s.Resources.Service = service + } + + // service/HPA used to be owned by the deployment for some reason + // TODO: check if this can be removed + for _, stack := range stacksetContainer.StackContainers { + if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid { + stack.Resources.Service = service + continue Items + } + } + } + } + return nil +} + +func (c *StackSetController) addHPAs(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error { + hpas, err := c.informers.hpaInformer.Lister().HorizontalPodAutoscalers(key.Namespace).List(labelSelector) + if err != nil { + return fmt.Errorf("failed to list HPAs for StackSet %s: %v", key.String(), err) + } + +Items: + for _, hpa := range hpas { + hpa := hpa.DeepCopy() + if uid, ok := getOwnerUID(hpa.ObjectMeta); ok { + if s, ok := stacksetContainer.StackContainers[uid]; ok { + s.Resources.HPA = hpa + } + + // service/HPA used to be owned by the deployment for some reason + // TODO: check if this can be removed + for _, stack := range stacksetContainer.StackContainers { + if stack.Resources.Deployment != nil && stack.Resources.Deployment.UID == uid { + stack.Resources.HPA = hpa + continue Items + } + } + } + } + return nil +} + +func (c *StackSetController) addConfigMaps(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error { + configMaps, err := c.informers.configMapInformer.Lister().ConfigMaps(key.Namespace).List(labelSelector) + if err != nil { + return fmt.Errorf("failed to list ConfigMaps for StackSet %s: %v", key.String(), err) + } + + for _, configMap := range configMaps { + configMap := configMap.DeepCopy() + if uid, ok := getOwnerUID(configMap.ObjectMeta); ok { + if s, ok := stacksetContainer.StackContainers[uid]; ok { + s.Resources.ConfigMaps = append(s.Resources.ConfigMaps, configMap) + } + } + } + return nil +} + +func (c *StackSetController) addSecrets(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error { + secrets, err := c.informers.secretInformer.Lister().Secrets(key.Namespace).List(labelSelector) + if err != nil { + return fmt.Errorf("failed to list Secrets for StackSet %s: %v", key.String(), err) + } + + for _, secret := range secrets { + secret := secret.DeepCopy() + if uid, ok := getOwnerUID(secret.ObjectMeta); ok { + if s, ok := stacksetContainer.StackContainers[uid]; ok { + s.Resources.Secrets = append(s.Resources.Secrets, secret) + } + } + } + return nil +} + +func (c *StackSetController) addPlatformCredentialsSet(stacksetContainer *core.StackSetContainer, key types.NamespacedName, labelSelector labels.Selector) error { + platformCredentialsSets, err := c.informers.pcsInformer.Lister().PlatformCredentialsSets(key.Namespace).List(labelSelector) + if err != nil { + return fmt.Errorf("failed to list PlatformCredentialsSets for StackSet %s: %v", key.String(), err) + } + + for _, pcs := range platformCredentialsSets { + pcs := pcs.DeepCopy() + if uid, ok := getOwnerUID(pcs.ObjectMeta); ok { + if s, ok := stacksetContainer.StackContainers[uid]; ok { + s.Resources.PlatformCredentialsSets = append( + s.Resources.PlatformCredentialsSets, + pcs, + ) + } + } + } + return nil +} + func getOwnerUID(objectMeta metav1.ObjectMeta) (types.UID, bool) { if len(objectMeta.OwnerReferences) == 1 { return objectMeta.OwnerReferences[0].UID, true @@ -629,6 +1037,125 @@ func (c *StackSetController) startWatch(ctx context.Context) error { return nil } +type informerInterface interface { + Informer() cache.SharedIndexInformer +} + +func (c *StackSetController) setupInformers(ctx context.Context) error { + factory := informers.NewSharedInformerFactoryWithOptions(c.client, 0, informers.WithNamespace(c.config.Namespace)) + + stackSetFactory := stacksetinformers.NewSharedInformerFactoryWithOptions(c.client, 24*time.Hour, stacksetinformers.WithNamespace(c.config.Namespace)) + rgClient := c.client.(*clientset.Clientset) + routegroupFactory := routegroupinformers.NewSharedInformerFactoryWithOptions(rgClient.RouteGroup, 0, routegroupinformers.WithNamespace(c.config.Namespace)) + + c.informers.stacksetInformer = stackSetFactory.Zalando().V1().StackSets() + c.informers.stackInformer = stackSetFactory.Zalando().V1().Stacks() + c.informers.ingressInformer = factory.Networking().V1().Ingresses() + c.informers.routegroupInformer = routegroupFactory.Zalando().V1().RouteGroups() + c.informers.deploymentInformer = factory.Apps().V1().Deployments() + c.informers.serviceInformer = factory.Core().V1().Services() + c.informers.hpaInformer = factory.Autoscaling().V2().HorizontalPodAutoscalers() + c.informers.configMapInformer = factory.Core().V1().ConfigMaps() + c.informers.secretInformer = factory.Core().V1().Secrets() + c.informers.pcsInformer = stackSetFactory.Zalando().V1().PlatformCredentialsSets() + + resourceInformers := []struct { + informer informerInterface + resync time.Duration + }{ + {c.informers.stacksetInformer, c.config.Interval}, + {c.informers.stackInformer, 0}, + {c.informers.ingressInformer, 0}, + {c.informers.routegroupInformer, 0}, + {c.informers.deploymentInformer, 0}, + {c.informers.serviceInformer, 0}, + {c.informers.hpaInformer, 0}, + {c.informers.configMapInformer, 0}, + {c.informers.secretInformer, 0}, + {c.informers.pcsInformer, 0}, + } + + var hasSynced []cache.InformerSynced + for _, informer := range resourceInformers { + _, err := informer.informer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + AddFunc: c.addResource, + UpdateFunc: c.updateResource, + DeleteFunc: c.deleteResource, + }, + informer.resync) + if err != nil { + return fmt.Errorf("failed to add event handler: %v", err) + } + + hasSynced = append(hasSynced, informer.informer.Informer().HasSynced) + go informer.informer.Informer().Run(ctx.Done()) + } + + if !cache.WaitForCacheSync(ctx.Done(), + hasSynced..., + ) { + return fmt.Errorf("failed to sync informers") + } + + return nil +} + +func objToNamespacedName(obj any) (types.NamespacedName, bool) { + switch typedObj := obj.(type) { + case *zv1.StackSet: + return types.NamespacedName{ + Namespace: typedObj.Namespace, + Name: typedObj.Name, + }, true + case *zv1.Stack, *networking.Ingress, *rgv1.RouteGroup, *appsv1.Deployment, *v1.Service, *autoscalingv2.HorizontalPodAutoscaler, *v1.ConfigMap, *v1.Secret, *zv1.PlatformCredentialsSet: + meta, ok := obj.(metav1.ObjectMeta) + if !ok { + return types.NamespacedName{}, false + } + + if stackset, ok := meta.GetLabels()[core.StacksetHeritageLabelKey]; ok { + return types.NamespacedName{ + Namespace: meta.Namespace, + Name: stackset, + }, true + } + + return types.NamespacedName{}, false + default: + return types.NamespacedName{}, false + } +} + +func (c *StackSetController) addResource(obj any) { + key, ok := objToNamespacedName(obj) + if !ok { + return + } + c.queue.Add(key) +} + +func (c *StackSetController) updateResource(oldObj, newObj any) { + c.addResource(newObj) +} + +func (c *StackSetController) deleteResource(obj any) { + stackset, ok := obj.(*zv1.StackSet) + if !ok { + // non-stackset deletions indicate refresh to resource + // associated with a stackset + c.addResource(obj) + return + } + + key := types.NamespacedName{ + Namespace: stackset.Namespace, + Name: stackset.Name, + } + + c.queue.Forget(key) + c.queue.Done(key) +} + func (c *StackSetController) add(obj interface{}) { stackset, ok := obj.(*zv1.StackSet) if !ok { @@ -1092,16 +1619,24 @@ func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *c } }() + var errors []error + // Create current stack, if needed. Proceed on errors. err = c.CreateCurrentStack(ctx, container) if err != nil { err = c.errorEventf(container.StackSet, "FailedCreateStack", err) c.stacksetLogger(container).Errorf("Unable to create stack: %v", err) + errors = append(errors, err) } // Update statuses from external resources (ingresses, deployments, etc). Abort on errors. err = container.UpdateFromResources() if err != nil { + c.recorder.Eventf( + container.StackSet, + v1.EventTypeWarning, + "FailedUpdateFromResources", + "Failed to update from resources: "+err.Error()) return err } @@ -1114,6 +1649,7 @@ func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *c v1.EventTypeWarning, "TrafficNotSwitched", "Failed to switch traffic: "+err.Error()) + errors = append(errors, err) } // Mark stacks that should be removed @@ -1131,6 +1667,7 @@ func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *c "Unable to reconcile traffic segments: %v", err, ) + errors = append(errors, err) } // Reconcile stack resources. Proceed on errors. @@ -1145,6 +1682,7 @@ func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *c "Unable to reconcile stack resources: %v", err, ) + errors = append(errors, err) } } @@ -1157,6 +1695,7 @@ func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *c if err != nil { err = c.errorEventf(sc.Stack, "FailedManageStack", err) c.stackLogger(container, sc).Errorf("Unable to reconcile stack resources: %v", err) + errors = append(errors, err) } } @@ -1165,6 +1704,7 @@ func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *c if err != nil { err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err) c.stacksetLogger(container).Errorf("Unable to reconcile stackset resources: %v", err) + errors = append(errors, err) } // Reconcile desired traffic in the stackset. Proceed on errors. @@ -1172,6 +1712,7 @@ func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *c if err != nil { err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err) c.stacksetLogger(container).Errorf("Unable to reconcile stackset traffic: %v", err) + errors = append(errors, err) } // Delete old stacks. Proceed on errors. @@ -1179,14 +1720,24 @@ func (c *StackSetController) ReconcileStackSet(ctx context.Context, container *c if err != nil { err = c.errorEventf(container.StackSet, reasonFailedManageStackSet, err) c.stacksetLogger(container).Errorf("Unable to delete old stacks: %v", err) + errors = append(errors, err) } // Update statuses. err = c.ReconcileStatuses(ctx, container) if err != nil { + c.recorder.Eventf( + container.StackSet, + v1.EventTypeWarning, + "FailedUpdateStatuses", + "Failed to update statuses: "+err.Error()) return err } + if len(errors) > 0 { + return fmt.Errorf("encountered %d errors during reconciliation, see events/logs for details", len(errors)) + } + return nil } diff --git a/e2e/apply/rbac.yaml b/e2e/apply/rbac.yaml index 90e97d17..cc37fabe 100644 --- a/e2e/apply/rbac.yaml +++ b/e2e/apply/rbac.yaml @@ -31,6 +31,7 @@ rules: verbs: - get - list + - watch - create - update - patch @@ -42,6 +43,7 @@ rules: verbs: - get - list + - watch - create - update - patch @@ -53,6 +55,7 @@ rules: verbs: - get - list + - watch - create - update - patch @@ -64,6 +67,7 @@ rules: verbs: - get - list + - watch - create - update - patch @@ -75,6 +79,7 @@ rules: verbs: - get - list + - watch - create - update - patch @@ -86,6 +91,7 @@ rules: verbs: - get - list + - watch - create - update - patch @@ -97,6 +103,7 @@ rules: verbs: - get - list + - watch - create - update - patch @@ -108,6 +115,7 @@ rules: verbs: - get - list + - watch - create - update - patch diff --git a/go.mod b/go.mod index 6fd04125..e0d33bff 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/stretchr/testify v1.11.1 github.com/szuecs/routegroup-client v0.34.1 golang.org/x/sync v0.19.0 + golang.org/x/time v0.9.0 k8s.io/api v0.34.3 k8s.io/apimachinery v0.34.3 k8s.io/client-go v0.34.3 @@ -63,7 +64,6 @@ require ( golang.org/x/sys v0.36.0 // indirect golang.org/x/term v0.35.0 // indirect golang.org/x/text v0.29.0 // indirect - golang.org/x/time v0.9.0 // indirect golang.org/x/tools v0.37.0 // indirect google.golang.org/protobuf v1.36.8 // indirect gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect diff --git a/pkg/clientset/unified.go b/pkg/clientset/unified.go index 72686f9e..80cba733 100644 --- a/pkg/clientset/unified.go +++ b/pkg/clientset/unified.go @@ -18,7 +18,7 @@ type Interface interface { type Clientset struct { kubernetes.Interface stackset stackset.Interface - routegroup rg.Interface + RouteGroup rg.Interface } func NewClientset(kubernetes kubernetes.Interface, stackset stackset.Interface, routegroup rg.Interface) *Clientset { @@ -53,5 +53,5 @@ func (c *Clientset) ZalandoV1() zalandov1.ZalandoV1Interface { } func (c *Clientset) RouteGroupV1() rgv1.ZalandoV1Interface { - return c.routegroup.ZalandoV1() + return c.RouteGroup.ZalandoV1() }