diff --git a/pkg/project/auth/cache.go b/pkg/project/auth/cache.go index c4fd41d70..e1f08561b 100644 --- a/pkg/project/auth/cache.go +++ b/pkg/project/auth/cache.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "time" "k8s.io/klog/v2" @@ -173,6 +174,14 @@ func (l syncedClusterRoleBindingLister) LastSyncResourceVersion() string { return l.versioner.LastSyncResourceVersion() } +// authorizationCacheStores groups the three cache stores so they can be +// swapped atomically during full cache invalidation. +type authorizationCacheStores struct { + reviewRecordStore cache.Store + userSubjectRecordStore cache.Store + groupSubjectRecordStore cache.Store +} + // AuthorizationCache maintains a cache on the set of namespaces a user or group can access. type AuthorizationCache struct { // allKnownNamespaces we track all the known namespaces, so we can detect deletes. @@ -187,9 +196,7 @@ type AuthorizationCache struct { roleBindingNamespacer SyncedRoleBindingLister roleLastSyncResourceVersioner LastSyncResourceVersioner - reviewRecordStore cache.Store - userSubjectRecordStore cache.Store - groupSubjectRecordStore cache.Store + stores atomic.Pointer[authorizationCacheStores] clusterBindingResourceVersions sets.String clusterRoleResourceVersions sets.String @@ -255,10 +262,6 @@ func NewAuthorizationCache( roleBindingNamespacer: srbLister, roleLastSyncResourceVersioner: unionLastSyncResourceVersioner{scrLister, scrbLister, srLister, srbLister}, - reviewRecordStore: cache.NewStore(reviewRecordKeyFn), - userSubjectRecordStore: cache.NewStore(subjectRecordKeyFn), - groupSubjectRecordStore: cache.NewStore(subjectRecordKeyFn), - reviewer: reviewer, skip: &neverSkipSynchronizer{}, @@ -268,6 +271,11 @@ func NewAuthorizationCache( lastCacheInvalidation: realClock.Now(), maxCacheLifespan: defaultMaxCacheLifespan, } + ac.stores.Store(&authorizationCacheStores{ + reviewRecordStore: cache.NewStore(reviewRecordKeyFn), + userSubjectRecordStore: cache.NewStore(subjectRecordKeyFn), + groupSubjectRecordStore: cache.NewStore(subjectRecordKeyFn), + }) ac.lastSyncResourceVersioner = namespaceLastSyncResourceVersioner ac.syncHandler = ac.syncRequest return ac @@ -424,7 +432,7 @@ func (ac *AuthorizationCache) invalidateCache(expired bool) bool { return invalidateCache } -// synchronize runs a a full synchronization over the cache data. it must be run in a single-writer model, it's not thread-safe by design. +// synchronize runs a full synchronization over the cache data. It must be run in a single-writer model, it's not thread-safe by design. func (ac *AuthorizationCache) synchronize() { expired := ac.cacheHasExpired() // if none of our internal reflectors changed, then we can skip reviewing the cache @@ -434,9 +442,10 @@ func (ac *AuthorizationCache) synchronize() { } // by default, we update our current caches and do an incremental change - userSubjectRecordStore := ac.userSubjectRecordStore - groupSubjectRecordStore := ac.groupSubjectRecordStore - reviewRecordStore := ac.reviewRecordStore + currentStores := ac.stores.Load() + userSubjectRecordStore := currentStores.userSubjectRecordStore + groupSubjectRecordStore := currentStores.groupSubjectRecordStore + reviewRecordStore := currentStores.reviewRecordStore // if there was a global change that forced complete invalidation, we rebuild our cache and do a fast swap at end invalidateCache := ac.invalidateCache(expired) @@ -453,11 +462,13 @@ func (ac *AuthorizationCache) synchronize() { ac.synchronizeRoleBindings(userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore) ac.purgeDeletedNamespaces(ac.allKnownNamespaces, newKnownNamespaces, userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore) - // if we did a full rebuild, now we swap the fully rebuilt cache + // if we did a full rebuild, now we swap the fully rebuilt cache atomically if invalidateCache { - ac.userSubjectRecordStore = userSubjectRecordStore - ac.groupSubjectRecordStore = groupSubjectRecordStore - ac.reviewRecordStore = reviewRecordStore + ac.stores.Store(&authorizationCacheStores{ + userSubjectRecordStore: userSubjectRecordStore, + groupSubjectRecordStore: groupSubjectRecordStore, + reviewRecordStore: reviewRecordStore, + }) } ac.allKnownNamespaces = newKnownNamespaces @@ -511,14 +522,17 @@ func (ac *AuthorizationCache) List(userInfo user.Info, selector labels.Selector) user := userInfo.GetName() groups := userInfo.GetGroups() - obj, exists, _ := ac.userSubjectRecordStore.GetByKey(user) + // snapshot the stores pointer once so we read from a consistent pair + stores := ac.stores.Load() + + obj, exists, _ := stores.userSubjectRecordStore.GetByKey(user) if exists { subjectRecord := obj.(*subjectRecord) keys.Insert(subjectRecord.namespaces.List()...) } for _, group := range groups { - obj, exists, _ := ac.groupSubjectRecordStore.GetByKey(group) + obj, exists, _ := stores.groupSubjectRecordStore.GetByKey(group) if exists { subjectRecord := obj.(*subjectRecord) keys.Insert(subjectRecord.namespaces.List()...) @@ -600,10 +614,13 @@ func deleteNamespaceFromSubjects(subjectRecordStore cache.Store, subjects []stri for _, subject := range subjects { obj, exists, _ := subjectRecordStore.GetByKey(subject) if exists { - subjectRecord := obj.(*subjectRecord) - delete(subjectRecord.namespaces, namespace) - if len(subjectRecord.namespaces) == 0 { - subjectRecordStore.Delete(subjectRecord) + old := obj.(*subjectRecord) + newNamespaces := sets.NewString(old.namespaces.UnsortedList()...) + newNamespaces.Delete(namespace) + if len(newNamespaces) == 0 { + subjectRecordStore.Delete(old) + } else { + subjectRecordStore.Update(&subjectRecord{subject: subject, namespaces: newNamespaces}) } } } @@ -612,15 +629,15 @@ func deleteNamespaceFromSubjects(subjectRecordStore cache.Store, subjects []stri // addSubjectsToNamespace adds the specified namespace to each subject func addSubjectsToNamespace(subjectRecordStore cache.Store, subjects []string, namespace string) { for _, subject := range subjects { - var item *subjectRecord obj, exists, _ := subjectRecordStore.GetByKey(subject) if exists { - item = obj.(*subjectRecord) + old := obj.(*subjectRecord) + newNamespaces := sets.NewString(old.namespaces.UnsortedList()...) + newNamespaces.Insert(namespace) + subjectRecordStore.Update(&subjectRecord{subject: subject, namespaces: newNamespaces}) } else { - item = &subjectRecord{subject: subject, namespaces: sets.NewString()} - subjectRecordStore.Add(item) + subjectRecordStore.Add(&subjectRecord{subject: subject, namespaces: sets.NewString(namespace)}) } - item.namespaces.Insert(namespace) } } diff --git a/pkg/project/auth/cache_test.go b/pkg/project/auth/cache_test.go index d1de2d78f..d11762f5d 100644 --- a/pkg/project/auth/cache_test.go +++ b/pkg/project/auth/cache_test.go @@ -3,6 +3,7 @@ package auth import ( "fmt" "strconv" + "sync" "testing" "time" @@ -436,3 +437,133 @@ func TestAuthorizationCache_invalidateCache(t *testing.T) { }) } } + +func TestAuthorizationCacheRace(t *testing.T) { + namespaceList := corev1.NamespaceList{ + Items: []corev1.Namespace{ + {ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "2"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "3"}}, + }, + } + mockKubeClient := fake.NewSimpleClientset(&namespaceList) + + reviewer := &mockReviewer{ + expectedResults: map[string]*mockReview{ + "foo": { + users: []string{alice.GetName(), bob.GetName()}, + groups: eve.GetGroups(), + }, + "bar": { + users: []string{frank.GetName(), eve.GetName()}, + groups: []string{"random"}, + }, + "baz": { + users: []string{alice.GetName()}, + groups: []string{"employee"}, + }, + }, + } + + informers := informers.NewSharedInformerFactory(mockKubeClient, controller.NoResyncPeriodFunc()) + nsIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + nsLister := corev1listers.NewNamespaceLister(nsIndexer) + + authorizationCache := NewAuthorizationCache( + nsLister, + informers.Core().V1().Namespaces().Informer(), + reviewer, + informers.Rbac().V1(), + ) + for i := range namespaceList.Items { + nsIndexer.Add(&namespaceList.Items[i]) + } + + // seed the cache + authorizationCache.synchronize() + + stop := make(chan struct{}) + var wg sync.WaitGroup + + // writer goroutine: continuously synchronize, toggling access + wg.Add(1) + go func() { + defer wg.Done() + rv := 10 + for { + select { + case <-stop: + return + default: + } + // toggle access patterns to force map mutations + if rv%2 == 0 { + reviewer.expectedResults["foo"] = &mockReview{ + users: []string{alice.GetName(), bob.GetName()}, + groups: eve.GetGroups(), + } + reviewer.expectedResults["baz"] = &mockReview{ + users: []string{frank.GetName()}, + groups: []string{}, + } + } else { + reviewer.expectedResults["foo"] = &mockReview{ + users: []string{frank.GetName()}, + groups: []string{"random"}, + } + reviewer.expectedResults["baz"] = &mockReview{ + users: []string{alice.GetName(), eve.GetName()}, + groups: []string{"employee"}, + } + } + rv++ + for i := range namespaceList.Items { + ns := namespaceList.Items[i] + ns.ResourceVersion = strconv.Itoa(rv) + nsIndexer.Update(&ns) + } + authorizationCache.synchronize() + } + }() + + // reader goroutines: continuously call List + users := []user.Info{alice, bob, eve, frank} + for _, u := range users { + u := u + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + } + _, err := authorizationCache.List(u, labels.Everything()) + if err != nil { + t.Errorf("List(%s) returned error: %v", u.GetName(), err) + return + } + } + }() + } + + time.Sleep(2 * time.Second) + close(stop) + wg.Wait() +} + +func BenchmarkAddSubjectsToNamespace(b *testing.B) { + for _, namespaceCount := range []int{10, 100, 1000} { + b.Run(fmt.Sprintf("namespaces=%d", namespaceCount), func(b *testing.B) { + subjects := []string{"alice", "bob", "eve", "frank", "grace"} + b.ResetTimer() + for i := 0; i < b.N; i++ { + store := cache.NewStore(subjectRecordKeyFn) + for ns := 0; ns < namespaceCount; ns++ { + addSubjectsToNamespace(store, subjects, fmt.Sprintf("namespace-%d", ns)) + } + } + }) + } +}