-
Notifications
You must be signed in to change notification settings - Fork 114
Fix concurrent map access race in project authorization cache #643
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
gangwgr
wants to merge
1
commit into
openshift:main
Choose a base branch
from
gangwgr:fix-concurrent-map-race
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,232 @@ | ||
| package auth | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "sync" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "k8s.io/apimachinery/pkg/util/sets" | ||
| "k8s.io/client-go/tools/cache" | ||
| ) | ||
|
|
||
| // TestConcurrentMapAccess_NoRace verifies the fix for OCPBUGS-XXXXX | ||
| // This test ensures that concurrent access to subjectRecord.namespaces | ||
| // does not cause "fatal error: concurrent map iteration and map write" | ||
| func TestConcurrentMapAccess_NoRace(t *testing.T) { | ||
| // Create subject record store | ||
| subjectRecordStore := cache.NewStore(subjectRecordKeyFn) | ||
|
|
||
| // Add initial subject with namespaces | ||
| subject := &subjectRecord{ | ||
| subject: "test-user", | ||
| namespaces: sets.NewString("ns-1", "ns-2", "ns-3"), | ||
| } | ||
| subjectRecordStore.Add(subject) | ||
|
|
||
| var wg sync.WaitGroup | ||
| done := make(chan bool) | ||
|
|
||
| // Simulate concurrent writes (like synchronize() does) | ||
| for i := 0; i < 10; i++ { | ||
| wg.Add(1) | ||
| go func(id int) { | ||
| defer wg.Done() | ||
| for { | ||
| select { | ||
| case <-done: | ||
| return | ||
| default: | ||
| // Simulate deleteNamespaceFromSubjects | ||
| deleteNamespaceFromSubjects(subjectRecordStore, []string{"test-user"}, fmt.Sprintf("ns-%d", id%5)) | ||
| time.Sleep(1 * time.Millisecond) | ||
| // Simulate addSubjectsToNamespace | ||
| addSubjectsToNamespace(subjectRecordStore, []string{"test-user"}, fmt.Sprintf("ns-%d", id%5)) | ||
| } | ||
| } | ||
| }(i) | ||
| } | ||
|
|
||
| // Simulate concurrent reads (like List() does) | ||
| for i := 0; i < 10; i++ { | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| for { | ||
| select { | ||
| case <-done: | ||
| return | ||
| default: | ||
| // Simulate reading from subjectRecord.namespaces | ||
| obj, exists, _ := subjectRecordStore.GetByKey("test-user") | ||
| if exists { | ||
| subjectRecord := obj.(*subjectRecord) | ||
| subjectRecord.mu.RLock() | ||
| _ = subjectRecord.namespaces.List() | ||
| subjectRecord.mu.RUnlock() | ||
| } | ||
| time.Sleep(1 * time.Millisecond) | ||
| } | ||
| } | ||
| }() | ||
| } | ||
|
|
||
| // Run for 2 seconds | ||
| time.Sleep(2 * time.Second) | ||
| close(done) | ||
| wg.Wait() | ||
|
|
||
| t.Log("Race condition test passed - no concurrent map access panic") | ||
| } | ||
|
|
||
| // TestAuthorizationCache_ListConcurrentAccess tests the actual List() method | ||
| // under concurrent load to ensure it doesn't panic with concurrent map access | ||
| func TestAuthorizationCache_ListConcurrentAccess(t *testing.T) { | ||
| // Create a minimal mock authorization cache | ||
| userSubjectRecordStore := cache.NewStore(subjectRecordKeyFn) | ||
| groupSubjectRecordStore := cache.NewStore(subjectRecordKeyFn) | ||
|
|
||
| // Add test data | ||
| userRecord := &subjectRecord{ | ||
| subject: "test-user", | ||
| namespaces: sets.NewString("default", "kube-system", "openshift"), | ||
| } | ||
| userSubjectRecordStore.Add(userRecord) | ||
|
|
||
| groupRecord := &subjectRecord{ | ||
| subject: "system:authenticated", | ||
| namespaces: sets.NewString("test-ns-1", "test-ns-2"), | ||
| } | ||
| groupSubjectRecordStore.Add(groupRecord) | ||
|
|
||
| var wg sync.WaitGroup | ||
| done := make(chan bool) | ||
|
|
||
| // Simulate concurrent writes to the same records | ||
| for i := 0; i < 5; i++ { | ||
| wg.Add(1) | ||
| go func(id int) { | ||
| defer wg.Done() | ||
| for { | ||
| select { | ||
| case <-done: | ||
| return | ||
| default: | ||
| deleteNamespaceFromSubjects(userSubjectRecordStore, []string{"test-user"}, "default") | ||
| addSubjectsToNamespace(userSubjectRecordStore, []string{"test-user"}, "default") | ||
| deleteNamespaceFromSubjects(groupSubjectRecordStore, []string{"system:authenticated"}, "test-ns-1") | ||
| addSubjectsToNamespace(groupSubjectRecordStore, []string{"system:authenticated"}, "test-ns-1") | ||
| time.Sleep(1 * time.Millisecond) | ||
| } | ||
| } | ||
| }(i) | ||
| } | ||
|
|
||
| // Simulate concurrent reads (mimicking the List() method's behavior) | ||
| for i := 0; i < 10; i++ { | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| for { | ||
| select { | ||
| case <-done: | ||
| return | ||
| default: | ||
| // This mimics what List() does at lines 517 and 524 | ||
| keys := sets.String{} | ||
|
|
||
| obj, exists, _ := userSubjectRecordStore.GetByKey("test-user") | ||
| if exists { | ||
| subjectRecord := obj.(*subjectRecord) | ||
| subjectRecord.mu.RLock() | ||
| keys.Insert(subjectRecord.namespaces.List()...) | ||
| subjectRecord.mu.RUnlock() | ||
| } | ||
|
|
||
| obj, exists, _ = groupSubjectRecordStore.GetByKey("system:authenticated") | ||
| if exists { | ||
| subjectRecord := obj.(*subjectRecord) | ||
| subjectRecord.mu.RLock() | ||
| keys.Insert(subjectRecord.namespaces.List()...) | ||
| subjectRecord.mu.RUnlock() | ||
| } | ||
|
|
||
| _ = keys.List() | ||
| time.Sleep(1 * time.Millisecond) | ||
| } | ||
| } | ||
| }() | ||
| } | ||
|
|
||
| // Run for 3 seconds | ||
| time.Sleep(3 * time.Second) | ||
| close(done) | ||
| wg.Wait() | ||
|
|
||
| t.Log("Authorization cache List() concurrent access test passed") | ||
| } | ||
|
|
||
| // TestSubjectRecordConcurrentModification verifies individual subjectRecord | ||
| // can handle concurrent reads and writes safely | ||
| func TestSubjectRecordConcurrentModification(t *testing.T) { | ||
| record := &subjectRecord{ | ||
| subject: "concurrent-test", | ||
| namespaces: sets.NewString(), | ||
| } | ||
|
|
||
| var wg sync.WaitGroup | ||
| errors := make(chan error, 100) | ||
|
|
||
| // Concurrent writers | ||
| for i := 0; i < 20; i++ { | ||
| wg.Add(1) | ||
| go func(id int) { | ||
| defer wg.Done() | ||
| defer func() { | ||
| if r := recover(); r != nil { | ||
| errors <- fmt.Errorf("writer %d panicked: %v", id, r) | ||
| } | ||
| }() | ||
| for j := 0; j < 100; j++ { | ||
| ns := fmt.Sprintf("namespace-%d-%d", id, j) | ||
| record.mu.Lock() | ||
| record.namespaces.Insert(ns) | ||
| record.mu.Unlock() | ||
| time.Sleep(100 * time.Microsecond) | ||
| record.mu.Lock() | ||
| delete(record.namespaces, ns) | ||
| record.mu.Unlock() | ||
| } | ||
| }(i) | ||
| } | ||
|
|
||
| // Concurrent readers | ||
| for i := 0; i < 20; i++ { | ||
| wg.Add(1) | ||
| go func(id int) { | ||
| defer wg.Done() | ||
| defer func() { | ||
| if r := recover(); r != nil { | ||
| errors <- fmt.Errorf("reader %d panicked: %v", id, r) | ||
| } | ||
| }() | ||
| for j := 0; j < 100; j++ { | ||
| record.mu.RLock() | ||
| _ = record.namespaces.List() | ||
| _ = record.namespaces.Len() | ||
| record.mu.RUnlock() | ||
| time.Sleep(100 * time.Microsecond) | ||
| } | ||
| }(i) | ||
| } | ||
|
|
||
| wg.Wait() | ||
| close(errors) | ||
|
|
||
| // Check if any goroutine panicked | ||
| for err := range errors { | ||
| t.Fatal(err) | ||
| } | ||
|
|
||
| t.Log("Concurrent modification test passed - no panics or race conditions") | ||
| } |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard the store pointers as well as the per-subject maps.
This fixes the
subjectRecord.namespacesrace, butList()still readsac.userSubjectRecordStore/ac.groupSubjectRecordStoreconcurrently withsynchronize()swapping those fields during cache invalidation. That leaves an unsynchronized read/write on the store references and can also giveList()a mixed old/new snapshot.Suggested fix
type AuthorizationCache struct { + storeMu sync.RWMutex + reviewRecordStore cache.Store userSubjectRecordStore cache.Store groupSubjectRecordStore cache.Store ... } func (ac *AuthorizationCache) List(userInfo user.Info, selector labels.Selector) (*corev1.NamespaceList, error) { + ac.storeMu.RLock() + userStore := ac.userSubjectRecordStore + groupStore := ac.groupSubjectRecordStore + ac.storeMu.RUnlock() + keys := sets.String{} user := userInfo.GetName() groups := userInfo.GetGroups() - obj, exists, _ := ac.userSubjectRecordStore.GetByKey(user) + obj, exists, _ := userStore.GetByKey(user) if exists { subjectRecord := obj.(*subjectRecord) subjectRecord.mu.RLock() keys.Insert(subjectRecord.namespaces.List()...) subjectRecord.mu.RUnlock() } for _, group := range groups { - obj, exists, _ := ac.groupSubjectRecordStore.GetByKey(group) + obj, exists, _ := groupStore.GetByKey(group) if exists { subjectRecord := obj.(*subjectRecord) subjectRecord.mu.RLock() keys.Insert(subjectRecord.namespaces.List()...) subjectRecord.mu.RUnlock() } } } // inside synchronize() if invalidateCache { + ac.storeMu.Lock() ac.userSubjectRecordStore = userSubjectRecordStore ac.groupSubjectRecordStore = groupSubjectRecordStore ac.reviewRecordStore = reviewRecordStore + ac.storeMu.Unlock() }Also applies to: 527-529
🤖 Prompt for AI Agents