Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions gitops-engine/pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,24 @@ type clusterCache struct {
parentUIDToChildren map[types.UID]map[kube.ResourceKey]struct{}
}

func (c *clusterCache) namespaceExists(ctx context.Context, nsClient dynamic.NamespaceableResourceInterface, namespace string, deletedNamespaces *sync.Map) bool {
if namespace == "" {
return true // Cluster-wide operations don't need namespace validation
}

_, err := nsClient.Get(ctx, namespace, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
c.log.Info(fmt.Sprintf("Namespace '%s' no longer exists, marking for removal", namespace))
deletedNamespaces.Store(namespace, true)
return false
}
c.log.V(1).Info(fmt.Sprintf("Failed to get namespace '%s' existence: %v", namespace, err))
}

return true
}

type clusterCacheSync struct {
// When using this struct:
// 1) 'lock' mutex should be acquired when reading/writing from fields of this struct.
Expand Down Expand Up @@ -668,14 +686,18 @@ func (c *clusterCache) startMissingWatches() error {
return fmt.Errorf("failed to create clientset: %w", err)
}
namespacedResources := make(map[schema.GroupKind]bool)

// For watch startup, we don't update namespaces list, so use empty map
deletedNamespaces := &sync.Map{}

for i := range apis {
api := apis[i]
namespacedResources[api.GroupKind] = api.Meta.Namespaced
if _, ok := c.apisMeta[api.GroupKind]; !ok {
ctx, cancel := context.WithCancel(context.Background())
c.apisMeta[api.GroupKind] = &apiMeta{namespaced: api.Meta.Namespaced, watchCancel: cancel}

err := c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
err := c.processApi(client, api, deletedNamespaces, func(resClient dynamic.ResourceInterface, ns string) error {
resourceVersion, err := c.loadInitialState(ctx, api, resClient, ns, false) // don't lock here, we are already in a lock before startMissingWatches is called inside watchEvents
if err != nil && c.isRestrictedResource(err) {
keep := false
Expand Down Expand Up @@ -925,15 +947,26 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
// processApi processes all the resources for a given API. First we construct an API client for the given API. Then we
// call the callback. If we're managing the whole cluster, we call the callback with the client and an empty namespace.
// If we're managing specific namespaces, we call the callback for each namespace.
func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResourceInfo, callback func(resClient dynamic.ResourceInterface, ns string) error) error {
func (c *clusterCache) processApi(client dynamic.Interface, api kube.APIResourceInfo, deletedNamespaces *sync.Map, callback func(resClient dynamic.ResourceInterface, ns string) error) error {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main problem with this function is that it is run asynchronously. deletedNamespaces might be empty for all calls. If you have 100 kinds, this will cause 100 calls to validate the namespace.

It seems better to check for namespace existence before processing the APIs in parallel.

resClient := client.Resource(api.GroupVersionResource)
switch {
// if manage whole cluster or resource is cluster level and cluster resources enabled
case len(c.namespaces) == 0 || (!api.Meta.Namespaced && c.clusterResources):
return callback(resClient, "")
// if manage some namespaces and resource is namespaced
case len(c.namespaces) != 0 && api.Meta.Namespaced:
nsClient := client.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "namespaces",
})

for _, ns := range c.namespaces {
if !c.namespaceExists(context.Background(), nsClient, ns, deletedNamespaces) {
// Namespace was deleted, skip it (deletedNamespaces map tracks it for later cleanup)
continue
}

err := callback(resClient.Namespace(ns), ns)
if err != nil {
return err
Expand Down Expand Up @@ -1022,6 +1055,7 @@ func (c *clusterCache) sync() error {
c.namespacedResources = make(map[schema.GroupKind]bool)
c.parentUIDToChildren = make(map[types.UID]map[kube.ResourceKey]struct{})
config := c.config

version, err := c.kubectl.GetServerVersion(config)
if err != nil {
return fmt.Errorf("failed to get server version: %w", err)
Expand Down Expand Up @@ -1061,6 +1095,9 @@ func (c *clusterCache) sync() error {
go c.processEvents()
}

// Track deleted namespaces found during parallel processing
deletedNamespaces := &sync.Map{}

// Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields.
lock := sync.Mutex{}
err = kube.RunAllAsync(len(apis), func(i int) error {
Expand All @@ -1073,7 +1110,7 @@ func (c *clusterCache) sync() error {
c.namespacedResources[api.GroupKind] = api.Meta.Namespaced
lock.Unlock()

return c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
return c.processApi(client, api, deletedNamespaces, func(resClient dynamic.ResourceInterface, ns string) error {
resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
return listPager.EachListItem(context.Background(), metav1.ListOptions{}, func(obj runtime.Object) error {
if un, ok := obj.(*unstructured.Unstructured); !ok {
Expand Down Expand Up @@ -1119,6 +1156,17 @@ func (c *clusterCache) sync() error {
return fmt.Errorf("failed to sync cluster %s: %w", c.config.Host, err)
}

// After parallel processing completes, update namespace list by removing deleted ones
var validNamespaces []string
for _, ns := range c.namespaces {
if _, deleted := deletedNamespaces.Load(ns); deleted {
c.log.Info(fmt.Sprintf("Namespace '%s' no longer exists, removing from cluster cache", ns))
continue
}
validNamespaces = append(validNamespaces, ns)
}
c.namespaces = validNamespaces

c.log.Info("Cluster successfully synced")
return nil
}
Expand Down
Loading
Loading