Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions pkg/project/apiserver/registry/project/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,26 @@ func (s *REST) Watch(ctx context.Context, options *metainternal.ListOptions) (wa
return nil, fmt.Errorf("no user")
}

includeAllExistingProjects := (options != nil) && options.ResourceVersion == "0"
// includeAllExistingProjects (RV="0") triggers sending initial state.
// sendBookmark (from SendInitialEvents) triggers sending a bookmark with
// k8s.io/initial-events-end annotation after initial events (if any).
includeAllExistingProjects, sendBookmark := false, false
if options != nil {
if options.ResourceVersion == "0" {
includeAllExistingProjects = true
}
if options.SendInitialEvents != nil && *options.SendInitialEvents {
sendBookmark = true
}
}

allowedNamespaces, err := scope.ScopesToVisibleNamespaces(userInfo.GetExtra()[authorizationapi.ScopesKey], s.authCache.GetClusterRoleLister(), true)
if err != nil {
return nil, err
}

m := projectutil.MatchProject(apihelpers.InternalListOptionsToSelectors(options))
watcher := projectauth.NewUserProjectWatcher(userInfo, allowedNamespaces, s.projectCache, s.authCache, includeAllExistingProjects, m)
watcher := projectauth.NewUserProjectWatcher(userInfo, allowedNamespaces, s.projectCache, s.authCache, includeAllExistingProjects, m, sendBookmark)
s.authCache.AddWatcher(watcher)

go watcher.Watch()
Expand Down
24 changes: 23 additions & 1 deletion pkg/project/auth/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type userProjectWatcher struct {
initialProjects []corev1.Namespace
// knownProjects maps name to resourceVersion
knownProjects map[string]string

sendBookmark bool
}

var (
Expand All @@ -72,7 +74,7 @@ var (
watchChannelHWM kstorage.HighWaterMark
)

func NewUserProjectWatcher(user user.Info, visibleNamespaces sets.String, projectCache *projectcache.ProjectCache, authCache WatchableCache, includeAllExistingProjects bool, predicate kstorage.SelectionPredicate) *userProjectWatcher {
func NewUserProjectWatcher(user user.Info, visibleNamespaces sets.String, projectCache *projectcache.ProjectCache, authCache WatchableCache, includeAllExistingProjects bool, predicate kstorage.SelectionPredicate, sendBookmark bool) *userProjectWatcher {
namespaces, _ := authCache.List(user, labels.Everything())
knownProjects := map[string]string{}
for _, namespace := range namespaces.Items {
Expand All @@ -98,6 +100,8 @@ func NewUserProjectWatcher(user user.Info, visibleNamespaces sets.String, projec
authCache: authCache,
initialProjects: initialProjects,
knownProjects: knownProjects,

sendBookmark: sendBookmark,
}
w.emit = func(e watch.Event) {
// if dealing with project events, ensure that we only emit events for projects
Expand Down Expand Up @@ -186,6 +190,13 @@ func (w *userProjectWatcher) GroupMembershipChanged(namespaceName string, users,

// Watch pulls stuff from etcd, converts, and pushes out the outgoing channel. Meant to be
// called as a goroutine.
//
// Design decision: This implementation balances KEP-3157 watch-list support with backward
// compatibility. Initial events are sent only when rv="0" (includeAllExistingProjects=true).
// For other rv values with SendInitialEvents=true, only the bookmark is sent. This approach
// acknowledges that project visibility depends on both namespace objects and RBAC state. Since
// RBAC changes don't update namespace ResourceVersions, permission-filtered views cannot provide
// the same consistency guarantees (resourceVersionMatch=NotOlderThan) as direct object watches.
func (w *userProjectWatcher) Watch() {
defer close(w.outgoing)
defer func() {
Expand Down Expand Up @@ -214,6 +225,17 @@ func (w *userProjectWatcher) Watch() {
})
}

if w.sendBookmark {
w.emit(watch.Event{
Type: watch.Bookmark,
Object: &projectapi.Project{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
},
},
})
}

for {
select {
case err := <-w.cacheError:
Expand Down
87 changes: 81 additions & 6 deletions pkg/project/auth/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
projectutil "github.com/openshift/openshift-apiserver/pkg/project/util"
)

func newTestWatcher(username string, groups []string, predicate storage.SelectionPredicate, namespaces ...*corev1.Namespace) (*userProjectWatcher, *fakeAuthCache, chan struct{}) {
func newTestWatcher(username string, groups []string, predicate storage.SelectionPredicate, includeAllExistingProjects bool, namespaces ...*corev1.Namespace) (*userProjectWatcher, *fakeAuthCache, chan struct{}) {
objects := []runtime.Object{}
for i := range namespaces {
objects = append(objects, namespaces[i])
Expand All @@ -37,11 +37,14 @@ func newTestWatcher(username string, groups []string, predicate storage.Selectio
"",
)
fakeAuthCache := &fakeAuthCache{}
if includeAllExistingProjects {
fakeAuthCache.namespaces = namespaces
}

stopCh := make(chan struct{})
go projectCache.Run(stopCh)

return NewUserProjectWatcher(&user.DefaultInfo{Name: username, Groups: groups}, sets.NewString("*"), projectCache, fakeAuthCache, false, predicate), fakeAuthCache, stopCh
return NewUserProjectWatcher(&user.DefaultInfo{Name: username, Groups: groups}, sets.NewString("*"), projectCache, fakeAuthCache, includeAllExistingProjects, predicate, false), fakeAuthCache, stopCh
}

type fakeAuthCache struct {
Expand All @@ -66,7 +69,7 @@ func (w *fakeAuthCache) List(userInfo user.Info, selector labels.Selector) (*cor
}

func TestFullIncoming(t *testing.T) {
watcher, fakeAuthCache, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), newNamespaces("ns-01")...)
watcher, fakeAuthCache, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), false, newNamespaces("ns-01")...)
defer close(stopCh)
watcher.cacheIncoming = make(chan watch.Event)

Expand Down Expand Up @@ -115,7 +118,7 @@ func TestFullIncoming(t *testing.T) {
}

func TestAddModifyDeleteEventsByUser(t *testing.T) {
watcher, _, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), newNamespaces("ns-01")...)
watcher, _, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), false, newNamespaces("ns-01")...)
defer close(stopCh)
go watcher.Watch()

Expand Down Expand Up @@ -158,7 +161,7 @@ func TestProjectSelectionPredicate(t *testing.T) {
field := fields.ParseSelectorOrDie("metadata.name=ns-03")
m := projectutil.MatchProject(labels.Everything(), field)

watcher, _, stopCh := newTestWatcher("bob", nil, m, newNamespaces("ns-01", "ns-02", "ns-03")...)
watcher, _, stopCh := newTestWatcher("bob", nil, m, false, newNamespaces("ns-01", "ns-02", "ns-03")...)
defer close(stopCh)

if watcher.emit == nil {
Expand Down Expand Up @@ -220,7 +223,7 @@ func TestProjectSelectionPredicate(t *testing.T) {
}

func TestAddModifyDeleteEventsByGroup(t *testing.T) {
watcher, _, stopCh := newTestWatcher("bob", []string{"group-one"}, matchAllPredicate(), newNamespaces("ns-01")...)
watcher, _, stopCh := newTestWatcher("bob", []string{"group-one"}, matchAllPredicate(), false, newNamespaces("ns-01")...)
defer close(stopCh)
go watcher.Watch()

Expand Down Expand Up @@ -271,3 +274,75 @@ func newNamespaces(names ...string) []*corev1.Namespace {
func matchAllPredicate() storage.SelectionPredicate {
return projectutil.MatchProject(labels.Everything(), fields.Everything())
}

func TestSendInitialEventsBookmark(t *testing.T) {
t.Run("with rv=0", func(t *testing.T) {
// rv="0" behavior: send initial events + bookmark
watcher, _, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), true, newNamespaces("ns-01", "ns-02")...)
defer close(stopCh)

// Enable bookmark for watch-list
watcher.sendBookmark = true

go watcher.Watch()

// expect 2 initial Added events
for i := 0; i < 2; i++ {
select {
case event := <-watcher.ResultChan():
if event.Type != watch.Added {
t.Errorf("expected Added, got %v", event.Type)
}
case <-time.After(3 * time.Second):
t.Fatalf("timeout waiting for initial event %d", i)
}
}

// expect bookmark with annotation
select {
case event := <-watcher.ResultChan():
if event.Type != watch.Bookmark {
t.Errorf("expected Bookmark, got %v", event.Type)
}
project := event.Object.(*projectapi.Project)
if project.Annotations[metav1.InitialEventsAnnotationKey] != "true" {
t.Errorf("expected initial-events-end annotation")
}
case <-time.After(3 * time.Second):
t.Fatalf("timeout waiting for bookmark")
}
})

t.Run("without rv=0", func(t *testing.T) {
// rv!="0" behavior: send bookmark only, no initial events
watcher, _, stopCh := newTestWatcher("bob", nil, matchAllPredicate(), false, newNamespaces("ns-01", "ns-02")...)
defer close(stopCh)

// Enable bookmark for watch-list
watcher.sendBookmark = true

go watcher.Watch()

// expect bookmark with annotation immediately
select {
case event := <-watcher.ResultChan():
if event.Type != watch.Bookmark {
t.Errorf("expected Bookmark, got %v", event.Type)
}
project := event.Object.(*projectapi.Project)
if project.Annotations[metav1.InitialEventsAnnotationKey] != "true" {
t.Errorf("expected initial-events-end annotation")
}
case <-time.After(3 * time.Second):
t.Fatalf("timeout waiting for bookmark")
}

// verify no additional events
select {
case event := <-watcher.ResultChan():
t.Fatalf("unexpected event after bookmark: %v", event)
case <-time.After(500 * time.Millisecond):
// expected - no more events
}
})
}