Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 43 additions & 26 deletions pkg/project/auth/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -173,6 +174,14 @@ func (l syncedClusterRoleBindingLister) LastSyncResourceVersion() string {
return l.versioner.LastSyncResourceVersion()
}

// authorizationCacheStores groups the three cache stores so they can be
// swapped atomically during full cache invalidation.
type authorizationCacheStores struct {
reviewRecordStore cache.Store
userSubjectRecordStore cache.Store
groupSubjectRecordStore cache.Store
}

// AuthorizationCache maintains a cache on the set of namespaces a user or group can access.
type AuthorizationCache struct {
// allKnownNamespaces we track all the known namespaces, so we can detect deletes.
Expand All @@ -187,9 +196,7 @@ type AuthorizationCache struct {
roleBindingNamespacer SyncedRoleBindingLister
roleLastSyncResourceVersioner LastSyncResourceVersioner

reviewRecordStore cache.Store
userSubjectRecordStore cache.Store
groupSubjectRecordStore cache.Store
stores atomic.Pointer[authorizationCacheStores]

clusterBindingResourceVersions sets.String
clusterRoleResourceVersions sets.String
Expand Down Expand Up @@ -255,10 +262,6 @@ func NewAuthorizationCache(
roleBindingNamespacer: srbLister,
roleLastSyncResourceVersioner: unionLastSyncResourceVersioner{scrLister, scrbLister, srLister, srbLister},

reviewRecordStore: cache.NewStore(reviewRecordKeyFn),
userSubjectRecordStore: cache.NewStore(subjectRecordKeyFn),
groupSubjectRecordStore: cache.NewStore(subjectRecordKeyFn),

reviewer: reviewer,
skip: &neverSkipSynchronizer{},

Expand All @@ -268,6 +271,11 @@ func NewAuthorizationCache(
lastCacheInvalidation: realClock.Now(),
maxCacheLifespan: defaultMaxCacheLifespan,
}
ac.stores.Store(&authorizationCacheStores{
reviewRecordStore: cache.NewStore(reviewRecordKeyFn),
userSubjectRecordStore: cache.NewStore(subjectRecordKeyFn),
groupSubjectRecordStore: cache.NewStore(subjectRecordKeyFn),
})
ac.lastSyncResourceVersioner = namespaceLastSyncResourceVersioner
ac.syncHandler = ac.syncRequest
return ac
Expand Down Expand Up @@ -424,7 +432,7 @@ func (ac *AuthorizationCache) invalidateCache(expired bool) bool {
return invalidateCache
}

// synchronize runs a a full synchronization over the cache data. it must be run in a single-writer model, it's not thread-safe by design.
// synchronize runs a full synchronization over the cache data. It must be run in a single-writer model, it's not thread-safe by design.
func (ac *AuthorizationCache) synchronize() {
expired := ac.cacheHasExpired()
// if none of our internal reflectors changed, then we can skip reviewing the cache
Expand All @@ -434,9 +442,10 @@ func (ac *AuthorizationCache) synchronize() {
}

// by default, we update our current caches and do an incremental change
userSubjectRecordStore := ac.userSubjectRecordStore
groupSubjectRecordStore := ac.groupSubjectRecordStore
reviewRecordStore := ac.reviewRecordStore
currentStores := ac.stores.Load()
userSubjectRecordStore := currentStores.userSubjectRecordStore
groupSubjectRecordStore := currentStores.groupSubjectRecordStore
reviewRecordStore := currentStores.reviewRecordStore

// if there was a global change that forced complete invalidation, we rebuild our cache and do a fast swap at end
invalidateCache := ac.invalidateCache(expired)
Expand All @@ -453,11 +462,13 @@ func (ac *AuthorizationCache) synchronize() {
ac.synchronizeRoleBindings(userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)
ac.purgeDeletedNamespaces(ac.allKnownNamespaces, newKnownNamespaces, userSubjectRecordStore, groupSubjectRecordStore, reviewRecordStore)

// if we did a full rebuild, now we swap the fully rebuilt cache
// if we did a full rebuild, now we swap the fully rebuilt cache atomically
if invalidateCache {
ac.userSubjectRecordStore = userSubjectRecordStore
ac.groupSubjectRecordStore = groupSubjectRecordStore
ac.reviewRecordStore = reviewRecordStore
ac.stores.Store(&authorizationCacheStores{
userSubjectRecordStore: userSubjectRecordStore,
groupSubjectRecordStore: groupSubjectRecordStore,
reviewRecordStore: reviewRecordStore,
})
}
ac.allKnownNamespaces = newKnownNamespaces

Expand Down Expand Up @@ -511,14 +522,17 @@ func (ac *AuthorizationCache) List(userInfo user.Info, selector labels.Selector)
user := userInfo.GetName()
groups := userInfo.GetGroups()

obj, exists, _ := ac.userSubjectRecordStore.GetByKey(user)
// snapshot the stores pointer once so we read from a consistent pair
stores := ac.stores.Load()

obj, exists, _ := stores.userSubjectRecordStore.GetByKey(user)
if exists {
subjectRecord := obj.(*subjectRecord)
keys.Insert(subjectRecord.namespaces.List()...)
}

for _, group := range groups {
obj, exists, _ := ac.groupSubjectRecordStore.GetByKey(group)
obj, exists, _ := stores.groupSubjectRecordStore.GetByKey(group)
if exists {
subjectRecord := obj.(*subjectRecord)
keys.Insert(subjectRecord.namespaces.List()...)
Expand Down Expand Up @@ -600,10 +614,13 @@ func deleteNamespaceFromSubjects(subjectRecordStore cache.Store, subjects []stri
for _, subject := range subjects {
obj, exists, _ := subjectRecordStore.GetByKey(subject)
if exists {
subjectRecord := obj.(*subjectRecord)
delete(subjectRecord.namespaces, namespace)
if len(subjectRecord.namespaces) == 0 {
subjectRecordStore.Delete(subjectRecord)
old := obj.(*subjectRecord)
newNamespaces := sets.NewString(old.namespaces.UnsortedList()...)
newNamespaces.Delete(namespace)
if len(newNamespaces) == 0 {
subjectRecordStore.Delete(old)
} else {
subjectRecordStore.Update(&subjectRecord{subject: subject, namespaces: newNamespaces})
}
}
}
Expand All @@ -612,15 +629,15 @@ func deleteNamespaceFromSubjects(subjectRecordStore cache.Store, subjects []stri
// addSubjectsToNamespace adds the specified namespace to each subject
func addSubjectsToNamespace(subjectRecordStore cache.Store, subjects []string, namespace string) {
for _, subject := range subjects {
var item *subjectRecord
obj, exists, _ := subjectRecordStore.GetByKey(subject)
if exists {
item = obj.(*subjectRecord)
old := obj.(*subjectRecord)
newNamespaces := sets.NewString(old.namespaces.UnsortedList()...)
newNamespaces.Insert(namespace)
subjectRecordStore.Update(&subjectRecord{subject: subject, namespaces: newNamespaces})
} else {
item = &subjectRecord{subject: subject, namespaces: sets.NewString()}
subjectRecordStore.Add(item)
subjectRecordStore.Add(&subjectRecord{subject: subject, namespaces: sets.NewString(namespace)})
}
item.namespaces.Insert(namespace)
}
}

Expand Down
131 changes: 131 additions & 0 deletions pkg/project/auth/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package auth
import (
"fmt"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -436,3 +437,133 @@ func TestAuthorizationCache_invalidateCache(t *testing.T) {
})
}
}

func TestAuthorizationCacheRace(t *testing.T) {
namespaceList := corev1.NamespaceList{
Items: []corev1.Namespace{
{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "2"}},
{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "3"}},
},
}
mockKubeClient := fake.NewSimpleClientset(&namespaceList)

reviewer := &mockReviewer{
expectedResults: map[string]*mockReview{
"foo": {
users: []string{alice.GetName(), bob.GetName()},
groups: eve.GetGroups(),
},
"bar": {
users: []string{frank.GetName(), eve.GetName()},
groups: []string{"random"},
},
"baz": {
users: []string{alice.GetName()},
groups: []string{"employee"},
},
},
}

informers := informers.NewSharedInformerFactory(mockKubeClient, controller.NoResyncPeriodFunc())
nsIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
nsLister := corev1listers.NewNamespaceLister(nsIndexer)

authorizationCache := NewAuthorizationCache(
nsLister,
informers.Core().V1().Namespaces().Informer(),
reviewer,
informers.Rbac().V1(),
)
for i := range namespaceList.Items {
nsIndexer.Add(&namespaceList.Items[i])
}

// seed the cache
authorizationCache.synchronize()

stop := make(chan struct{})
var wg sync.WaitGroup

// writer goroutine: continuously synchronize, toggling access
wg.Add(1)
go func() {
defer wg.Done()
rv := 10
for {
select {
case <-stop:
return
default:
}
// toggle access patterns to force map mutations
if rv%2 == 0 {
reviewer.expectedResults["foo"] = &mockReview{
users: []string{alice.GetName(), bob.GetName()},
groups: eve.GetGroups(),
}
reviewer.expectedResults["baz"] = &mockReview{
users: []string{frank.GetName()},
groups: []string{},
}
} else {
reviewer.expectedResults["foo"] = &mockReview{
users: []string{frank.GetName()},
groups: []string{"random"},
}
reviewer.expectedResults["baz"] = &mockReview{
users: []string{alice.GetName(), eve.GetName()},
groups: []string{"employee"},
}
}
rv++
for i := range namespaceList.Items {
ns := namespaceList.Items[i]
ns.ResourceVersion = strconv.Itoa(rv)
nsIndexer.Update(&ns)
}
authorizationCache.synchronize()
}
}()

// reader goroutines: continuously call List
users := []user.Info{alice, bob, eve, frank}
for _, u := range users {
u := u
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stop:
return
default:
}
_, err := authorizationCache.List(u, labels.Everything())
if err != nil {
t.Errorf("List(%s) returned error: %v", u.GetName(), err)
return
}
}
}()
}

time.Sleep(2 * time.Second)
close(stop)
wg.Wait()
}

func BenchmarkAddSubjectsToNamespace(b *testing.B) {
for _, namespaceCount := range []int{10, 100, 1000} {
b.Run(fmt.Sprintf("namespaces=%d", namespaceCount), func(b *testing.B) {
subjects := []string{"alice", "bob", "eve", "frank", "grace"}
b.ResetTimer()
for i := 0; i < b.N; i++ {
store := cache.NewStore(subjectRecordKeyFn)
for ns := 0; ns < namespaceCount; ns++ {
addSubjectsToNamespace(store, subjects, fmt.Sprintf("namespace-%d", ns))
}
}
})
}
}