diff --git a/gitops-engine/pkg/cache/cluster.go b/gitops-engine/pkg/cache/cluster.go index 863d87d7898e8..7a0fcb556d765 100644 --- a/gitops-engine/pkg/cache/cluster.go +++ b/gitops-engine/pkg/cache/cluster.go @@ -287,6 +287,24 @@ type clusterCache struct { parentUIDToChildren map[types.UID]map[kube.ResourceKey]struct{} } +func (c *clusterCache) namespaceExists(ctx context.Context, nsClient dynamic.NamespaceableResourceInterface, namespace string, deletedNamespaces *sync.Map) bool { + if namespace == "" { + return true // Cluster-wide operations don't need namespace validation + } + + _, err := nsClient.Get(ctx, namespace, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + c.log.Info(fmt.Sprintf("Namespace '%s' no longer exists, marking for removal", namespace)) + deletedNamespaces.Store(namespace, true) + return false + } + c.log.V(1).Info(fmt.Sprintf("Failed to get namespace '%s' existence: %v", namespace, err)) + } + + return true +} + type clusterCacheSync struct { // When using this struct: // 1) 'lock' mutex should be acquired when reading/writing from fields of this struct. @@ -668,6 +686,10 @@ func (c *clusterCache) startMissingWatches() error { return fmt.Errorf("failed to create clientset: %w", err) } namespacedResources := make(map[schema.GroupKind]bool) + + // For watch startup, we don't update namespaces list, so use empty map + deletedNamespaces := &sync.Map{} + for i := range apis { api := apis[i] namespacedResources[api.GroupKind] = api.Meta.Namespaced @@ -675,7 +697,7 @@ func (c *clusterCache) startMissingWatches() error { ctx, cancel := context.WithCancel(context.Background()) c.apisMeta[api.GroupKind] = &apiMeta{namespaced: api.Meta.Namespaced, watchCancel: cancel} - err := c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error { + err := c.processApi(client, api, deletedNamespaces, func(resClient dynamic.ResourceInterface, ns string) error { resourceVersion, err := c.loadInitialState(ctx, api, resClient, ns, false) // don't lock here, we are already in a lock before startMissingWatches is called inside watchEvents if err != nil && c.isRestrictedResource(err) { keep := false @@ -925,7 +947,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo // processApi processes all the resources for a given API. First we construct an API client for the given API. Then we // call the callback. If we're managing the whole cluster, we call the callback with the client and an empty namespace. // If we're managing specific namespaces, we call the callback for each namespace. -func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResourceInfo, callback func(resClient dynamic.ResourceInterface, ns string) error) error { +func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResourceInfo, deletedNamespaces *sync.Map, callback func(resClient dynamic.ResourceInterface, ns string) error) error { resClient := client.Resource(api.GroupVersionResource) switch { // if manage whole cluster or resource is cluster level and cluster resources enabled @@ -933,7 +955,18 @@ func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResource return callback(resClient, "") // if manage some namespaces and resource is namespaced case len(c.namespaces) != 0 && api.Meta.Namespaced: + nsClient := client.Resource(schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }) + for _, ns := range c.namespaces { + if !c.namespaceExists(context.Background(), nsClient, ns, deletedNamespaces) { + // Namespace was deleted, skip it (deletedNamespaces map tracks it for later cleanup) + continue + } + err := callback(resClient.Namespace(ns), ns) if err != nil { return err @@ -1022,6 +1055,7 @@ func (c *clusterCache) sync() error { c.namespacedResources = make(map[schema.GroupKind]bool) c.parentUIDToChildren = make(map[types.UID]map[kube.ResourceKey]struct{}) config := c.config + version, err := c.kubectl.GetServerVersion(config) if err != nil { return fmt.Errorf("failed to get server version: %w", err) @@ -1061,6 +1095,9 @@ func (c *clusterCache) sync() error { go c.processEvents() } + // Track deleted namespaces found during parallel processing + deletedNamespaces := &sync.Map{} + // Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields. lock := sync.Mutex{} err = kube.RunAllAsync(len(apis), func(i int) error { @@ -1073,7 +1110,7 @@ func (c *clusterCache) sync() error { c.namespacedResources[api.GroupKind] = api.Meta.Namespaced lock.Unlock() - return c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error { + return c.processApi(client, api, deletedNamespaces, func(resClient dynamic.ResourceInterface, ns string) error { resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error { return listPager.EachListItem(context.Background(), metav1.ListOptions{}, func(obj runtime.Object) error { if un, ok := obj.(*unstructured.Unstructured); !ok { @@ -1119,6 +1156,17 @@ func (c *clusterCache) sync() error { return fmt.Errorf("failed to sync cluster %s: %w", c.config.Host, err) } + // After parallel processing completes, update namespace list by removing deleted ones + var validNamespaces []string + for _, ns := range c.namespaces { + if _, deleted := deletedNamespaces.Load(ns); deleted { + c.log.Info(fmt.Sprintf("Namespace '%s' no longer exists, removing from cluster cache", ns)) + continue + } + validNamespaces = append(validNamespaces, ns) + } + c.namespaces = validNamespaces + c.log.Info("Cluster successfully synced") return nil } diff --git a/gitops-engine/pkg/cache/cluster_test.go b/gitops-engine/pkg/cache/cluster_test.go index 8548b330734d2..d6abc65fbde57 100644 --- a/gitops-engine/pkg/cache/cluster_test.go +++ b/gitops-engine/pkg/cache/cluster_test.go @@ -345,6 +345,16 @@ func TestStatefulSetOwnershipInferred(t *testing.T) { }, } + defaultNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + } + tests := []struct { name string cluster *clusterCache @@ -354,7 +364,7 @@ func TestStatefulSetOwnershipInferred(t *testing.T) { }{ { name: "STSTemplateNameNotMatching", - cluster: newCluster(t, sts), + cluster: newCluster(t, sts, defaultNs), pvc: &corev1.PersistentVolumeClaim{ TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"}, @@ -363,7 +373,7 @@ func TestStatefulSetOwnershipInferred(t *testing.T) { }, { name: "MatchingSTSExists", - cluster: newCluster(t, sts), + cluster: newCluster(t, sts, defaultNs), pvc: &corev1.PersistentVolumeClaim{ TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, ObjectMeta: metav1.ObjectMeta{Name: "www-web-0", Namespace: "default"}, @@ -372,7 +382,7 @@ func TestStatefulSetOwnershipInferred(t *testing.T) { }, { name: "STSTemplateNameNotMatchingWithBatchProcessing", - cluster: newClusterWithOptions(t, opts, sts), + cluster: newClusterWithOptions(t, opts, sts, defaultNs), pvc: &corev1.PersistentVolumeClaim{ TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, ObjectMeta: metav1.ObjectMeta{Name: "www1-web-0", Namespace: "default"}, @@ -381,7 +391,7 @@ func TestStatefulSetOwnershipInferred(t *testing.T) { }, { name: "MatchingSTSExistsWithBatchProcessing", - cluster: newClusterWithOptions(t, opts, sts), + cluster: newClusterWithOptions(t, opts, sts, defaultNs), pvc: &corev1.PersistentVolumeClaim{ TypeMeta: metav1.TypeMeta{Kind: kube.PersistentVolumeClaimKind}, ObjectMeta: metav1.ObjectMeta{Name: "www-web-0", Namespace: "default"}, @@ -560,7 +570,26 @@ func TestEnsureSyncedSingleNamespace(t *testing.T) { }, } - cluster := newCluster(t, obj1, obj2) + ns1 := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default1", + }, + } + ns2 := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default2", + }, + } + + cluster := newCluster(t, obj1, obj2, ns1, ns2) cluster.namespaces = []string{"default1"} err := cluster.EnsureSynced() require.NoError(t, err) @@ -646,7 +675,26 @@ metadata: } func TestGetManagedLiveObjsNamespacedModeClusterLevelResource(t *testing.T) { - cluster := newCluster(t, testPod1(), testRS(), testDeploy()) + defaultNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + } + productionNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "production", + }, + } + + cluster := newCluster(t, testPod1(), testRS(), testDeploy(), defaultNs, productionNs) cluster.Invalidate(SetPopulateResourceInfoHandler(func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) { return nil, true })) @@ -671,7 +719,26 @@ metadata: } func TestGetManagedLiveObjsNamespacedModeClusterLevelResource_ClusterResourceEnabled(t *testing.T) { - cluster := newCluster(t, testPod1(), testRS(), testDeploy()) + defaultNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + } + productionNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "production", + }, + } + + cluster := newCluster(t, testPod1(), testRS(), testDeploy(), defaultNs, productionNs) cluster.Invalidate(SetPopulateResourceInfoHandler(func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) { return nil, true })) @@ -740,7 +807,26 @@ metadata: } func TestGetManagedLiveObjsValidNamespace(t *testing.T) { - cluster := newCluster(t, testPod1(), testRS(), testDeploy()) + defaultNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + } + productionNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "production", + }, + } + + cluster := newCluster(t, testPod1(), testRS(), testDeploy(), defaultNs, productionNs) cluster.Invalidate(SetPopulateResourceInfoHandler(func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) { return nil, true })) @@ -768,7 +854,26 @@ metadata: } func TestGetManagedLiveObjsInvalidNamespace(t *testing.T) { - cluster := newCluster(t, testPod1(), testRS(), testDeploy()) + defaultNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + } + developNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "develop", + }, + } + + cluster := newCluster(t, testPod1(), testRS(), testDeploy(), defaultNs, developNs) cluster.Invalidate(SetPopulateResourceInfoHandler(func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) { return nil, true })) @@ -821,7 +926,18 @@ func TestGetManagedLiveObjsFailedConversion(t *testing.T) { t.Run(testCaseCopy.name, func(t *testing.T) { err := apiextensions.AddToScheme(scheme.Scheme) require.NoError(t, err) - cluster := newCluster(t, testCRD(), testCronTab()). + + defaultNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + } + + cluster := newCluster(t, testCRD(), testCronTab(), defaultNs). WithAPIResources([]kube.APIResourceInfo{ { GroupKind: schema.GroupKind{Group: cronTabGroup, Kind: "CronTab"}, @@ -2643,3 +2759,72 @@ func BenchmarkIncrementalIndexBuild(b *testing.B) { }) } } + +func TestStartMissingWatchesWithDeletedNamespace(t *testing.T) { + deletedNamespace := "deleted-namespace" + validNamespace := "default" + pod := testPod1() + pod.SetNamespace(validNamespace) + validNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: validNamespace, + }, + } + cluster := newCluster(t, pod, validNs) + cluster.namespaces = []string{validNamespace, deletedNamespace} + + // Initial sync to populate apisMeta (sync removes the deleted namespace) + err := cluster.sync() + require.NoError(t, err) + + // Simulate namespace deleted after sync but before watch restart + cluster.namespaces = []string{validNamespace, deletedNamespace} + + // Stop watches for Pods so startMissingWatches will restart them + podGK := pod.GroupVersionKind().GroupKind() + cluster.stopWatching(podGK, validNamespace) + + client := cluster.kubectl.(*kubetest.MockKubectlCmd).DynamicClient.(*fake.FakeDynamicClient) + client.ClearActions() + + // Call startMissingWatches under lock (as it would be in production) + err = runSynced(&cluster.lock, func() error { + return cluster.startMissingWatches() + }) + require.NoError(t, err) + + // Verify no list actions were performed for the deleted namespace + for _, action := range client.Actions() { + if action.GetVerb() == "list" && action.GetNamespace() == deletedNamespace { + t.Errorf("unexpected list action for deleted namespace %q: %v", deletedNamespace, action) + } + } +} + +func TestSyncWithDeletedNamespace(t *testing.T) { + deletedNamespace := "deleted-namespace" + validNamespace := "default" + pod := testPod1() + pod.SetNamespace(validNamespace) + validNs := &corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: validNamespace, + }, + } + cluster := newCluster(t, pod, validNs) + cluster.namespaces = []string{validNamespace, deletedNamespace} + + err := cluster.sync() + + require.NoError(t, err) + assert.NotContains(t, cluster.namespaces, deletedNamespace) + assert.Contains(t, cluster.namespaces, validNamespace) +}