From 8ea5b98a1c86fcb5ef2b4aaa6b6679205ad03245 Mon Sep 17 00:00:00 2001 From: Mangirdas Judeikis Date: Tue, 5 May 2026 16:01:50 +0300 Subject: [PATCH 1/2] Split annotation for multiple-writers --- .../crdnooverlappinggvr_admission.go | 32 ++- .../crdnooverlappinggvr_admission_test.go | 15 +- .../apis/apibinding/apibinding_controller.go | 10 +- .../apis/apibinding/apibinding_reconcile.go | 24 +- .../apibinding/apibinding_reconcile_test.go | 21 +- .../apis/apibinding/logical_cluster_lock.go | 233 +++++++++++++++++- .../logicalclustercleanup_controller.go | 145 ++++++----- .../logicalclustercleanup_controller_test.go | 28 ++- .../dynamicrestmapper_dynamic_controller.go | 22 +- pkg/virtual/replication/authorizer/content.go | 2 +- .../replication/authorizer/content_test.go | 4 +- 11 files changed, 432 insertions(+), 104 deletions(-) diff --git a/pkg/admission/crdnooverlappinggvr/crdnooverlappinggvr_admission.go b/pkg/admission/crdnooverlappinggvr/crdnooverlappinggvr_admission.go index 31c4f6715e8..2409e9dbcaf 100644 --- a/pkg/admission/crdnooverlappinggvr/crdnooverlappinggvr_admission.go +++ b/pkg/admission/crdnooverlappinggvr/crdnooverlappinggvr_admission.go @@ -18,6 +18,7 @@ package crdnooverlappinggvr import ( "context" + "encoding/json" "fmt" "io" "strings" @@ -27,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/client-go/util/retry" @@ -80,7 +82,33 @@ func (p *crdNoOverlappingGVRAdmission) SetKcpInformers(local, global kcpinformer // SetKcpClusterClient sets the client for kcp resources. It's part of WantsKcpClusterClient. func (p *crdNoOverlappingGVRAdmission) SetKcpClusterClient(c kcpclientset.ClusterInterface) { p.updateLogicalCluster = func(ctx context.Context, logicalCluster *corev1alpha1.LogicalCluster, opts metav1.UpdateOptions) (*corev1alpha1.LogicalCluster, error) { - return c.CoreV1alpha1().LogicalClusters().Cluster(logicalcluster.From(logicalCluster).Path()).Update(ctx, logicalCluster, opts) + // Use SSA to write only to the pending locks annotation key + patchObj := &corev1alpha1.LogicalCluster{ + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1alpha1.SchemeGroupVersion.String(), + Kind: "LogicalCluster", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: logicalCluster.Name, + Annotations: map[string]string{ + apibinding.LocksPendingAnnotationKey: logicalCluster.Annotations[apibinding.LocksPendingAnnotationKey], + }, + }, + } + patchBytes, err := json.Marshal(patchObj) + if err != nil { + return nil, err + } + return c.CoreV1alpha1().LogicalClusters().Cluster(logicalcluster.From(logicalCluster).Path()).Patch( + ctx, + logicalCluster.Name, + types.ApplyPatchType, + patchBytes, + metav1.PatchOptions{ + FieldManager: apibinding.FieldManagerPending, + // Force is not needed because each writer uses its own annotation key + }, + ) } } @@ -136,7 +164,7 @@ func (p *crdNoOverlappingGVRAdmission) Validate(ctx context.Context, a admission } var updated *corev1alpha1.LogicalCluster - updated, _, skipped, err = apibinding.WithLockedResources(nil, time.Now(), lc, []schema.GroupResource{gr}, apibinding.ExpirableLock{ + updated, _, skipped, err = apibinding.WithLockedResourcesForPending(nil, time.Now(), lc, []schema.GroupResource{gr}, apibinding.ExpirableLock{ Lock: apibinding.Lock{CRD: true}, CRDExpiry: ptr.To(p.now()), }) diff --git a/pkg/admission/crdnooverlappinggvr/crdnooverlappinggvr_admission_test.go b/pkg/admission/crdnooverlappinggvr/crdnooverlappinggvr_admission_test.go index 31bbae875a9..7bc11a02da5 100644 --- a/pkg/admission/crdnooverlappinggvr/crdnooverlappinggvr_admission_test.go +++ b/pkg/admission/crdnooverlappinggvr/crdnooverlappinggvr_admission_test.go @@ -140,7 +140,7 @@ func TestValidate(t *testing.T) { }, { - name: "fails without resource binding annotation on LogicalCluster", + name: "succeeds without resource binding annotation on LogicalCluster - creates pending lock", attr: createAttr(&apiextensions.CustomResourceDefinition{ ObjectMeta: metav1.ObjectMeta{Name: "test"}, Spec: apiextensions.CustomResourceDefinitionSpec{ @@ -150,7 +150,7 @@ func TestValidate(t *testing.T) { }), clusterName: "root:acme", initialObjects: []runtime.Object{createLogicalCluster("root:acme")}, - wantErr: true, + wantAnnotation: `{"foo.acme.dev":{"c":true,"e":"2022-01-01T00:00:00Z"}}`, }, { name: "fails without LogicalCluster", @@ -241,7 +241,8 @@ func TestValidate(t *testing.T) { if updatedLogicalCluster == nil { t.Fatal("expected LogicalCluster to be updated, got nil") } - if got := updatedLogicalCluster.Annotations[apibinding.ResourceBindingsAnnotationKey]; got != scenario.wantAnnotation { + // With SSA split, the admission plugin writes to the pending locks annotation key + if got := updatedLogicalCluster.Annotations[apibinding.LocksPendingAnnotationKey]; got != scenario.wantAnnotation { t.Errorf("expected LogicalCluster annotation %q, got %q", scenario.wantAnnotation, got) } } @@ -310,7 +311,7 @@ func createLogicalCluster(clusterName string) *corev1alpha1.LogicalCluster { func withCRD(lc *corev1alpha1.LogicalCluster, gr schema.GroupResource, expiry *metav1.Time) *corev1alpha1.LogicalCluster { rbs := make(apibinding.ResourceBindingsAnnotation) - if v := lc.Annotations[apibinding.ResourceBindingsAnnotationKey]; v != "" { + if v := lc.Annotations[apibinding.LocksPendingAnnotationKey]; v != "" { if err := json.Unmarshal([]byte(v), &rbs); err != nil { panic(err) } @@ -325,13 +326,13 @@ func withCRD(lc *corev1alpha1.LogicalCluster, gr schema.GroupResource, expiry *m if err != nil { panic(err) } - lc.Annotations[apibinding.ResourceBindingsAnnotationKey] = string(bs) + lc.Annotations[apibinding.LocksPendingAnnotationKey] = string(bs) return lc } func withBinding(lc *corev1alpha1.LogicalCluster, binding string, boundResources []apisv1alpha2.BoundAPIResource) *corev1alpha1.LogicalCluster { rbs := make(apibinding.ResourceBindingsAnnotation) - if v := lc.Annotations[apibinding.ResourceBindingsAnnotationKey]; v != "" { + if v := lc.Annotations[apibinding.LocksPendingAnnotationKey]; v != "" { if err := json.Unmarshal([]byte(v), &rbs); err != nil { panic(err) } @@ -347,6 +348,6 @@ func withBinding(lc *corev1alpha1.LogicalCluster, binding string, boundResources if err != nil { panic(err) } - lc.Annotations[apibinding.ResourceBindingsAnnotationKey] = string(bs) + lc.Annotations[apibinding.LocksPendingAnnotationKey] = string(bs) return lc } diff --git a/pkg/reconciler/apis/apibinding/apibinding_controller.go b/pkg/reconciler/apis/apibinding/apibinding_controller.go index 96b0bf2c9cf..9bb8898e46d 100644 --- a/pkg/reconciler/apis/apibinding/apibinding_controller.go +++ b/pkg/reconciler/apis/apibinding/apibinding_controller.go @@ -37,7 +37,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" - "k8s.io/utils/ptr" kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" kcpapiextensionsclientset "github.com/kcp-dev/client-go/apiextensions/client" @@ -63,8 +62,7 @@ import ( ) const ( - ControllerName = "kcp-apibinding" - LocksFieldManager = "apibinding-reconciler-locks" + ControllerName = "kcp-apibinding" ) var ( @@ -171,7 +169,7 @@ func NewController( ObjectMeta: metav1.ObjectMeta{ Name: lc.Name, Annotations: map[string]string{ - ResourceBindingsAnnotationKey: lc.Annotations[ResourceBindingsAnnotationKey], + LocksBindingsAnnotationKey: lc.Annotations[LocksBindingsAnnotationKey], }, }, } @@ -185,8 +183,8 @@ func NewController( types.ApplyPatchType, patchBytes, metav1.PatchOptions{ - FieldManager: LocksFieldManager, - Force: ptr.To(true), + FieldManager: FieldManagerBindings, + // Force is not needed because each writer uses its own annotation key }, ) return err diff --git a/pkg/reconciler/apis/apibinding/apibinding_reconcile.go b/pkg/reconciler/apis/apibinding/apibinding_reconcile.go index 0542d879be8..0f85f2312ab 100644 --- a/pkg/reconciler/apis/apibinding/apibinding_reconcile.go +++ b/pkg/reconciler/apis/apibinding/apibinding_reconcile.go @@ -280,8 +280,12 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp // TODO(sttts): removed schemas never get unlocked. We need a distinguishable way // for intentional removal of schemas, versus movement of schemas // to another APIExport. - previousResourceBindings := lc.Annotations[ResourceBindingsAnnotationKey] - lc, _, skipped, err = WithLockedResources(crds, time.Now(), lc, grs.UnsortedList(), ExpirableLock{ + // Read current bindings annotation to check if we need to update + bindingsBefore, err := GetBindingsLocks(lc) + if err != nil { + return err + } + lc, _, skipped, err = WithLockedResourcesForBindings(crds, time.Now(), lc, grs.UnsortedList(), ExpirableLock{ Lock: Lock{Name: apiBinding.Name}, }) if err != nil { @@ -311,7 +315,21 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp return err } - if lc.Annotations[ResourceBindingsAnnotationKey] != previousResourceBindings { + // Check if bindings annotation changed by comparing before and after + bindingsAfter, err := GetBindingsLocks(lc) + if err != nil { + return err + } + bindingsChanged := len(bindingsBefore) != len(bindingsAfter) + if !bindingsChanged { + for k, v := range bindingsBefore { + if v2, ok := bindingsAfter[k]; !ok || v != v2 { + bindingsChanged = true + break + } + } + } + if bindingsChanged { if err := r.updateLogicalCluster(ctx, lc); err != nil { return err } diff --git a/pkg/reconciler/apis/apibinding/apibinding_reconcile_test.go b/pkg/reconciler/apis/apibinding/apibinding_reconcile_test.go index 873ce0ab3b9..4b1e83d1bb2 100644 --- a/pkg/reconciler/apis/apibinding/apibinding_reconcile_test.go +++ b/pkg/reconciler/apis/apibinding/apibinding_reconcile_test.go @@ -285,13 +285,21 @@ func TestReconcileBinding(t *testing.T) { }, wantError: true, }, - "LogicalCluster without resource binding annotation": { - logicalCluster: newLogicalCluster(), + "LogicalCluster with empty resource binding annotation - binding succeeds": { + // With split annotations, the controller can now lock resources even when + // starting with an empty bindings annotation. This tests the successful + // binding flow with the new SSA-based split annotation design. + logicalCluster: withResourceBindings(newLogicalCluster(), ResourceBindingsAnnotation{}), apiBinding: binding.Build(), + wantCreateCRD: true, + wantAPIExportValid: wantAPIExportValid{ + valid: true, + }, wantInitialBindingComplete: wantInitialBindingComplete{ - resourceConflict: true, + // CRD is created, so we wait for it to be established + waitingForEstablished: true, }, - wantError: true, + wantBoundResources: nil, // not yet established }, "LogicalCluster update error": { logicalCluster: withResourceBindings(newLogicalCluster(), ResourceBindingsAnnotation{}), @@ -730,7 +738,7 @@ func TestReconcileBinding(t *testing.T) { lc.TypeMeta = metav1.TypeMeta{} } if tc.wantUpdatedResourceBindings != nil { - got, err := GetResourceBindings(lc) + got, err := GetBindingsLocks(lc) require.NoError(t, err) require.Equal(t, tc.wantUpdatedResourceBindings, got) } @@ -1329,7 +1337,8 @@ func withResourceBindings(lc *corev1alpha1.LogicalCluster, rbs ResourceBindingsA lc = lc.DeepCopy() lc.Annotations = make(map[string]string) - lc.Annotations[ResourceBindingsAnnotationKey] = string(bs) + // Use the new bindings annotation key for tests + lc.Annotations[LocksBindingsAnnotationKey] = string(bs) return lc } diff --git a/pkg/reconciler/apis/apibinding/logical_cluster_lock.go b/pkg/reconciler/apis/apibinding/logical_cluster_lock.go index f9963f14b6c..b0e6914216f 100644 --- a/pkg/reconciler/apis/apibinding/logical_cluster_lock.go +++ b/pkg/reconciler/apis/apibinding/logical_cluster_lock.go @@ -32,9 +32,30 @@ import ( ) const ( - // ResourceBindingsAnnotationKey is the key for the annotation on the LogicalCluster - // to hold ResourceBindings. + // ResourceBindingsAnnotationKey is the legacy key for the annotation on the LogicalCluster + // to hold ResourceBindings. This is kept for backward compatibility during migration. + // + // Deprecated: Use the new split annotation keys instead. ResourceBindingsAnnotationKey = "internal.apis.kcp.io/resource-bindings" + + // LocksBindingsAnnotationKey is the annotation key for APIBinding lock entries. + // Written by the apibinding controller. + LocksBindingsAnnotationKey = "internal.apis.kcp.io/locks-bindings" + // LocksCRDsAnnotationKey is the annotation key for CRD lock entries. + // Written by the logicalclustercleanup controller. + LocksCRDsAnnotationKey = "internal.apis.kcp.io/locks-crds" + // LocksPendingAnnotationKey is the annotation key for pending CRD locks. + // Written by the crdnooverlappinggvr admission plugin. + LocksPendingAnnotationKey = "internal.apis.kcp.io/locks-pending" +) + +const ( + // FieldManagerBindings is the SSA field manager for the apibinding controller. + FieldManagerBindings = "kcp-apibinding" + // FieldManagerCRDs is the SSA field manager for the logicalclustercleanup controller. + FieldManagerCRDs = "kcp-logicalclustercleanup" + // FieldManagerPending is the SSA field manager for the crdnooverlappinggvr admission plugin. + FieldManagerPending = "kcp-crdnooverlappinggvr" ) // Lock is a lock for a resource, part of the apis.kcp.io/resource-bindings annotation. @@ -75,6 +96,9 @@ func UnmarshalResourceBindingsAnnotation(ann string) (ResourceBindingsAnnotation } // GetResourceBindings reads ResourceBindingsAnnotation from LogicalCluster's annotation. +// For backward compatibility, it reads from the legacy annotation key if the new keys are not present. +// +// Deprecated: Use GetAllResourceBindings instead which merges all three annotation sources. func GetResourceBindings(lc *corev1alpha1.LogicalCluster) (ResourceBindingsAnnotation, error) { const jsonEmptyObj = "{}" @@ -86,6 +110,79 @@ func GetResourceBindings(lc *corev1alpha1.LogicalCluster) (ResourceBindingsAnnot return UnmarshalResourceBindingsAnnotation(ann) } +// GetBindingsLocks reads the bindings annotation from LogicalCluster. +func GetBindingsLocks(lc *corev1alpha1.LogicalCluster) (ResourceBindingsAnnotation, error) { + ann := lc.Annotations[LocksBindingsAnnotationKey] + if ann == "" { + return make(ResourceBindingsAnnotation), nil + } + return UnmarshalResourceBindingsAnnotation(ann) +} + +// GetCRDsLocks reads the CRDs annotation from LogicalCluster. +func GetCRDsLocks(lc *corev1alpha1.LogicalCluster) (ResourceBindingsAnnotation, error) { + ann := lc.Annotations[LocksCRDsAnnotationKey] + if ann == "" { + return make(ResourceBindingsAnnotation), nil + } + return UnmarshalResourceBindingsAnnotation(ann) +} + +// GetPendingLocks reads the pending locks annotation from LogicalCluster. +func GetPendingLocks(lc *corev1alpha1.LogicalCluster) (ResourceBindingsAnnotation, error) { + ann := lc.Annotations[LocksPendingAnnotationKey] + if ann == "" { + return make(ResourceBindingsAnnotation), nil + } + return UnmarshalResourceBindingsAnnotation(ann) +} + +// GetAllResourceBindings merges all three lock annotations (crds, bindings, pending) +// into a single ResourceBindingsAnnotation with precedence: crds > bindings > pending. +// This allows readers to see a unified view while writers can use SSA on their own keys. +func GetAllResourceBindings(lc *corev1alpha1.LogicalCluster) (ResourceBindingsAnnotation, error) { + result := make(ResourceBindingsAnnotation) + + // First, read from legacy annotation for backward compatibility during migration + legacy, err := GetResourceBindings(lc) + if err != nil { + return nil, err + } + for k, v := range legacy { + result[k] = v + } + + // Then, read from new annotations with precedence: crds > bindings > pending + // Pending locks have lowest priority (they're provisional) + pending, err := GetPendingLocks(lc) + if err != nil { + return nil, err + } + for k, v := range pending { + result[k] = v + } + + // Binding locks override pending locks + bindings, err := GetBindingsLocks(lc) + if err != nil { + return nil, err + } + for k, v := range bindings { + result[k] = v + } + + // CRD locks have highest priority + crds, err := GetCRDsLocks(lc) + if err != nil { + return nil, err + } + for k, v := range crds { + result[k] = v + } + + return result, nil +} + // WithLockedResources tries to lock the resources for the given binding. It // returns those resources that got successfully locked. If a resource is already // locked by another binding, it is skipped and returned in the second return @@ -186,3 +283,135 @@ func unlockResource(ctx context.Context, kcpClusterClient kcpclientset.ClusterIn return nil } + +// WithLockedResourcesForBindings locks resources for the APIBinding controller. +// It reads from the merged view of all annotations but only writes to the bindings annotation. +// This allows the apibinding controller to use SSA on its own key. +func WithLockedResourcesForBindings(crds []*apiextensionsv1.CustomResourceDefinition, now time.Time, lc *corev1alpha1.LogicalCluster, grs []schema.GroupResource, binding ExpirableLock) (*corev1alpha1.LogicalCluster, []schema.GroupResource, map[schema.GroupResource]Lock, error) { + // Read from all sources to check locks + rbs, err := GetAllResourceBindings(lc) + if err != nil { + return nil, nil, nil, err + } + + crdNames := make(map[string]bool, len(crds)) + for _, crd := range crds { + crdNames[crd.Name] = true + } + + // Read the bindings-specific annotation to update + bindingsRbs, err := GetBindingsLocks(lc) + if err != nil { + return nil, nil, nil, err + } + + // find what resources need to be newly locked + skipped := make(map[schema.GroupResource]Lock) + newlyLocked := make([]schema.GroupResource, 0, len(grs)) + locked := make([]schema.GroupResource, 0, len(grs)) + for _, gr := range grs { + b, found := rbs[gr.String()] + if !found || b.Lock == binding.Lock { + newlyLocked = append(newlyLocked, gr) + locked = append(locked, gr) + continue + } + if b.CRD && !crdNames[gr.String()] && b.CRDExpiry != nil && now.After(b.CRDExpiry.Time) { + // CRD binding expired, and CRD is not known + newlyLocked = append(newlyLocked, gr) + locked = append(locked, gr) + continue + } + skipped[gr] = b.Lock + } + + // don't do anything if no resources need to be locked + if len(newlyLocked) == 0 { + return lc, locked, skipped, nil + } + + // update only the bindings annotation + for _, gr := range newlyLocked { + bindingsRbs[gr.String()] = binding + } + lc = lc.DeepCopy() + bs, err := json.Marshal(bindingsRbs) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to marshal ResourceBindings annotation: %w", err) + } + if lc.Annotations == nil { + lc.Annotations = make(map[string]string) + } + lc.Annotations[LocksBindingsAnnotationKey] = string(bs) + + return lc, locked, skipped, nil +} + +// GetAllResourceBindingsForRead returns all resource bindings for reading purposes. +// This is a convenience alias for GetAllResourceBindings. +func GetAllResourceBindingsForRead(lc *corev1alpha1.LogicalCluster) (ResourceBindingsAnnotation, error) { + return GetAllResourceBindings(lc) +} + +// WithLockedResourcesForPending locks resources for the admission plugin (pending locks). +// It reads from the merged view of all annotations but only writes to the pending annotation. +// This allows the admission plugin to use SSA on its own key. +func WithLockedResourcesForPending(crds []*apiextensionsv1.CustomResourceDefinition, now time.Time, lc *corev1alpha1.LogicalCluster, grs []schema.GroupResource, binding ExpirableLock) (*corev1alpha1.LogicalCluster, []schema.GroupResource, map[schema.GroupResource]Lock, error) { + // Read from all sources to check locks + rbs, err := GetAllResourceBindings(lc) + if err != nil { + return nil, nil, nil, err + } + + crdNames := make(map[string]bool, len(crds)) + for _, crd := range crds { + crdNames[crd.Name] = true + } + + // Read the pending-specific annotation to update + pendingRbs, err := GetPendingLocks(lc) + if err != nil { + return nil, nil, nil, err + } + + // find what resources need to be newly locked + skipped := make(map[schema.GroupResource]Lock) + newlyLocked := make([]schema.GroupResource, 0, len(grs)) + locked := make([]schema.GroupResource, 0, len(grs)) + for _, gr := range grs { + b, found := rbs[gr.String()] + if !found || b.Lock == binding.Lock { + newlyLocked = append(newlyLocked, gr) + locked = append(locked, gr) + continue + } + if b.CRD && !crdNames[gr.String()] && b.CRDExpiry != nil && now.After(b.CRDExpiry.Time) { + // CRD binding expired, and CRD is not known + newlyLocked = append(newlyLocked, gr) + locked = append(locked, gr) + continue + } + skipped[gr] = b.Lock + } + + // don't do anything if no resources need to be locked + if len(newlyLocked) == 0 { + return lc, locked, skipped, nil + } + + // update only the pending annotation + for _, gr := range newlyLocked { + pendingRbs[gr.String()] = binding + } + lc = lc.DeepCopy() + bs, err := json.Marshal(pendingRbs) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to marshal ResourceBindings annotation: %w", err) + } + if lc.Annotations == nil { + lc.Annotations = make(map[string]string) + } + lc.Annotations[LocksPendingAnnotationKey] = string(bs) + + return lc, locked, skipped, nil +} diff --git a/pkg/reconciler/apis/logicalclustercleanup/logicalclustercleanup_controller.go b/pkg/reconciler/apis/logicalclustercleanup/logicalclustercleanup_controller.go index 8f6f6f7b463..22aa6a8dd27 100644 --- a/pkg/reconciler/apis/logicalclustercleanup/logicalclustercleanup_controller.go +++ b/pkg/reconciler/apis/logicalclustercleanup/logicalclustercleanup_controller.go @@ -18,6 +18,7 @@ package logicalclustercleanup import ( "context" + "encoding/json" "fmt" "time" @@ -27,7 +28,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/types" + utiljson "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" @@ -69,8 +71,34 @@ func NewController( getLogicalCluster: func(clusterName logicalcluster.Name) (*corev1alpha1.LogicalCluster, error) { return logicalClusterInformer.Lister().Cluster(clusterName).Get(corev1alpha1.LogicalClusterName) }, - updateLogicalCluster: func(ctx context.Context, lc *corev1alpha1.LogicalCluster) error { - _, err := kcpClusterClient.CoreV1alpha1().Cluster(logicalcluster.From(lc).Path()).LogicalClusters().Update(ctx, lc, metav1.UpdateOptions{}) + updateLogicalClusterCRDs: func(ctx context.Context, lc *corev1alpha1.LogicalCluster) error { + // Use SSA to write only to the CRDs annotation key + patchObj := &corev1alpha1.LogicalCluster{ + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1alpha1.SchemeGroupVersion.String(), + Kind: "LogicalCluster", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: lc.Name, + Annotations: map[string]string{ + apibindingreconciler.LocksCRDsAnnotationKey: lc.Annotations[apibindingreconciler.LocksCRDsAnnotationKey], + }, + }, + } + patchBytes, err := json.Marshal(patchObj) + if err != nil { + return err + } + _, err = kcpClusterClient.CoreV1alpha1().Cluster(logicalcluster.From(lc).Path()).LogicalClusters().Patch( + ctx, + lc.Name, + types.ApplyPatchType, + patchBytes, + metav1.PatchOptions{ + FieldManager: apibindingreconciler.FieldManagerCRDs, + // Force is not needed because each writer uses its own annotation key + }, + ) return err }, listsCRDs: func(clusterName logicalcluster.Name) ([]*apiextensionsv1.CustomResourceDefinition, error) { @@ -124,10 +152,10 @@ func NewController( type controller struct { queue workqueue.TypedRateLimitingInterface[string] - getLogicalCluster func(clusterName logicalcluster.Name) (*corev1alpha1.LogicalCluster, error) - updateLogicalCluster func(ctx context.Context, logicalCluster *corev1alpha1.LogicalCluster) error - listsCRDs func(clusterName logicalcluster.Name) ([]*apiextensionsv1.CustomResourceDefinition, error) - listAPIBindings func(clusterName logicalcluster.Name) ([]*apisv1alpha2.APIBinding, error) + getLogicalCluster func(clusterName logicalcluster.Name) (*corev1alpha1.LogicalCluster, error) + updateLogicalClusterCRDs func(ctx context.Context, logicalCluster *corev1alpha1.LogicalCluster) error + listsCRDs func(clusterName logicalcluster.Name) ([]*apiextensionsv1.CustomResourceDefinition, error) + listAPIBindings func(clusterName logicalcluster.Name) ([]*apisv1alpha2.APIBinding, error) } // enqueueLogicalCluster enqueues a LogicalCluster. @@ -226,12 +254,12 @@ func (c *controller) process(ctx context.Context, key string) error { logger := logging.WithObject(klog.FromContext(ctx), lc) - // decode existing annotation. + // decode existing CRD annotation - this controller only manages CRD entries. rbs := make(apibindingreconciler.ResourceBindingsAnnotation) - annValue, found := lc.Annotations[apibindingreconciler.ResourceBindingsAnnotationKey] + annValue, found := lc.Annotations[apibindingreconciler.LocksCRDsAnnotationKey] if found { - if err := json.Unmarshal([]byte(annValue), &rbs); err != nil { - logger.Error(err, "failed to unmarshal ResourceBindings annotation, resetting") + if err := utiljson.Unmarshal([]byte(annValue), &rbs); err != nil { + logger.Error(err, "failed to unmarshal CRDs locks annotation, resetting") annValue = "" } } @@ -249,38 +277,32 @@ func (c *controller) process(ctx context.Context, key string) error { return err } - // migrate or rebuild APIBinding entries from status.boundResources? This happens only once per logical cluster. - // After the initial migration this is done by the apibinding controller. - if annValue == "" { - for _, b := range bindings { - for _, br := range b.Status.BoundResources { - gr := schema.GroupResource{Group: br.Group, Resource: br.Resource} - if old, found := rbs[gr.String()]; !found { - rbs[gr.String()] = apibindingreconciler.ExpirableLock{Lock: apibindingreconciler.Lock{Name: b.Name}} - } else { - logger.Info("resource is already bound to APIBinding. THIS ININCONSISTENT!", "resource", gr.String(), "apibinding", old.Name, "apibinding", b.Name) + // migrate CRD entries from legacy annotation if needed + // Only migrate if we don't have any CRD entries yet + if len(rbs) == 0 { + // Check if there's a legacy annotation with CRD entries to migrate + legacyAnn, legacyFound := lc.Annotations[apibindingreconciler.ResourceBindingsAnnotationKey] //nolint:staticcheck // SA1019 - using deprecated constant for backward compatibility during migration + if legacyFound && legacyAnn != "" { + var legacyRbs apibindingreconciler.ResourceBindingsAnnotation + if err := utiljson.Unmarshal([]byte(legacyAnn), &legacyRbs); err == nil { + for gr, lock := range legacyRbs { + if lock.CRD { + rbs[gr] = lock + } } } } - } else { - // remove bindings that are gone. - bindingNames := make(map[string]bool) - for _, b := range bindings { - bindingNames[b.Name] = true - } - for gr, b := range rbs { - if b.CRD { - continue - } - if _, found := bindingNames[b.Name]; !found { - logger.V(4).Info("removing binding", "binding", b.Name, "resource", gr) - delete(rbs, gr) - } - } } - // always add CRDs. + // Build set of binding names for consistency checking + bindingNames := make(map[string]bool) + for _, b := range bindings { + bindingNames[b.Name] = true + } + + // Build set of established CRD names and group-resources crdNames := make(map[string]bool) + crdGroupResources := make(map[string]bool) for _, crd := range crds { crdNames[crd.Name] = true @@ -290,46 +312,53 @@ func (c *controller) process(ctx context.Context, key string) error { } gr := schema.GroupResource{Group: crd.Spec.Group, Resource: crd.Spec.Names.Plural} - if old, found := rbs[gr.String()]; !found { - rbs[gr.String()] = apibindingreconciler.ExpirableLock{Lock: apibindingreconciler.Lock{CRD: true}} - } else if !old.CRD { - logger.Info("CRD exists and is established, but already bound to APIBinding. THIS IS INCONSISTENT!", "resource", gr.String(), "apibinding", old.Name) - } + crdGroupResources[gr.String()] = true + } + + // Build new CRD entries map + newRbs := make(apibindingreconciler.ResourceBindingsAnnotation) + + // First, add all established CRDs + for gr := range crdGroupResources { + newRbs[gr] = apibindingreconciler.ExpirableLock{Lock: apibindingreconciler.Lock{CRD: true}} } - // remove only when expired and gone, remove expiry when CRD exists. - for gr, b := range rbs { - if !b.CRD { + // Process existing CRD entries: preserve expiry for missing CRDs, clear expiry for existing ones + for gr, lock := range rbs { + if !lock.CRD { + // Skip non-CRD entries - they shouldn't be here but just in case continue } - if crdNames[gr] { - b.CRDExpiry = nil - rbs[gr] = b + if crdGroupResources[gr] { + // CRD exists and is established, it's already in newRbs above continue } - // CRD doesn't exist. - if b.CRDExpiry != nil && time.Now().After(b.CRDExpiry.Time) { + // CRD doesn't exist anymore + if lock.CRDExpiry != nil && time.Now().After(lock.CRDExpiry.Time) { logger.V(4).Info("removing expired CRD binding of non-existing CRD", "crd", gr) - delete(rbs, gr) + // Don't add to newRbs (effectively deleting) + } else { + // Preserve the entry with its expiry + newRbs[gr] = lock } } - // update annotation on LogicalCluster. - bs, err := json.Marshal(rbs) + // update CRD annotation on LogicalCluster using SSA + bs, err := utiljson.Marshal(newRbs) if err != nil { - return fmt.Errorf("failed to marshal ResourceBindings annotation: %w", err) + return fmt.Errorf("failed to marshal CRDs locks annotation: %w", err) } - if lc.Annotations[apibindingreconciler.ResourceBindingsAnnotationKey] == string(bs) { + if lc.Annotations[apibindingreconciler.LocksCRDsAnnotationKey] == string(bs) { return nil } - logger.V(4).Info("Updating"+apibindingreconciler.ResourceBindingsAnnotationKey, "old", annValue, "new", string(bs)) + logger.V(4).Info("Updating "+apibindingreconciler.LocksCRDsAnnotationKey, "old", annValue, "new", string(bs)) lc = lc.DeepCopy() if lc.Annotations == nil { lc.Annotations = make(map[string]string) } - lc.Annotations[apibindingreconciler.ResourceBindingsAnnotationKey] = string(bs) - if err := c.updateLogicalCluster(ctx, lc); err != nil { + lc.Annotations[apibindingreconciler.LocksCRDsAnnotationKey] = string(bs) + if err := c.updateLogicalClusterCRDs(ctx, lc); err != nil { return fmt.Errorf("failed to update LogicalCluster: %w", err) } diff --git a/pkg/reconciler/apis/logicalclustercleanup/logicalclustercleanup_controller_test.go b/pkg/reconciler/apis/logicalclustercleanup/logicalclustercleanup_controller_test.go index 34e27a140ff..7820befb520 100644 --- a/pkg/reconciler/apis/logicalclustercleanup/logicalclustercleanup_controller_test.go +++ b/pkg/reconciler/apis/logicalclustercleanup/logicalclustercleanup_controller_test.go @@ -52,7 +52,7 @@ func TestReconciler(t *testing.T) { "no annotations leads to migration – no crds or bindings": { logicalCluster: &corev1alpha1.LogicalCluster{}, want: &corev1alpha1.LogicalCluster{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ - "internal.apis.kcp.io/resource-bindings": "{}", + "internal.apis.kcp.io/locks-crds": "{}", }}}, }, "update error": { @@ -72,7 +72,8 @@ func TestReconciler(t *testing.T) { &newAPIBinding().WithName("binding2").WithBoundResources("group", "cs", "group", "ds").APIBinding, }, want: &corev1alpha1.LogicalCluster{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ - "internal.apis.kcp.io/resource-bindings": `{"as.group":{"n":"binding1"},"bs.group":{"n":"binding1"},"crd1s.group":{"c":true},"crd2s.group":{"c":true},"cs.group":{"n":"binding2"},"ds.group":{"n":"binding2"}}`, + // Note: With SSA split, logicalclustercleanup only writes CRD entries + "internal.apis.kcp.io/locks-crds": `{"crd1s.group":{"c":true},"crd2s.group":{"c":true}}`, }}}, }, "with annotation, only CRDs are added": { @@ -89,29 +90,34 @@ func TestReconciler(t *testing.T) { &newAPIBinding().WithName("binding2").WithBoundResources("group", "cs", "group", "ds").APIBinding, }, want: &corev1alpha1.LogicalCluster{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ - "internal.apis.kcp.io/resource-bindings": `{"as.group":{"n":"binding1"},"crd1s.group":{"c":true},"crd2s.group":{"c":true}}`, + // With SSA split, logicalclustercleanup only manages CRD entries + // The old annotation is preserved for backward compatibility + "internal.apis.kcp.io/resource-bindings": `{"as.group":{"n":"binding1"},"crd1s.group":{"c":true}}`, + "internal.apis.kcp.io/locks-crds": `{"crd1s.group":{"c":true},"crd2s.group":{"c":true}}`, }}}, }, - "CRDs are not removed by default, but bindings are": { + "CRDs are not removed by default, but bindings are not managed by this controller": { logicalCluster: &corev1alpha1.LogicalCluster{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ - "internal.apis.kcp.io/resource-bindings": `{"as.group":{"n":"binding1"},"bs.group":{"n":"binding1"},"crd1s.group":{"c":true},"crd2s.group":{"c":true},"cs.group":{"n":"binding2"},"ds.group":{"n":"binding2"}}`, + "internal.apis.kcp.io/locks-crds": `{"crd1s.group":{"c":true},"crd2s.group":{"c":true}}`, }}}, apiBindings: []*apisv1alpha2.APIBinding{ &newAPIBinding().WithName("binding1").WithBoundResources("group", "as", "group", "bs").APIBinding, }, - want: &corev1alpha1.LogicalCluster{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ - "internal.apis.kcp.io/resource-bindings": `{"as.group":{"n":"binding1"},"bs.group":{"n":"binding1"},"crd1s.group":{"c":true},"crd2s.group":{"c":true}}`, - }}}, + want: nil, // No change expected - CRDs are already in the right place }, "Expired CRDs that don't exist are removed": { logicalCluster: &corev1alpha1.LogicalCluster{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ "internal.apis.kcp.io/resource-bindings": fmt.Sprintf(`{"crd1s.group":{"c":true,"e":%q},"crd2s.group":{"c":true,"e":%q},"crd3s.group":{"c":true,"e":%q}}`, expired, expired, notExpired), }}}, crds: []*apiextensionsv1.CustomResourceDefinition{ - newCRD("group", "crd1s"), + // crd1s is established, so it should be added without expiry + withEstablished(newCRD("group", "crd1s")), }, want: &corev1alpha1.LogicalCluster{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{ - "internal.apis.kcp.io/resource-bindings": fmt.Sprintf(`{"crd1s.group":{"c":true},"crd3s.group":{"c":true,"e":%q}}`, notExpired), + // With SSA split, controller migrates CRD entries from old annotation + // The old annotation is kept for backward compatibility + "internal.apis.kcp.io/resource-bindings": fmt.Sprintf(`{"crd1s.group":{"c":true,"e":%q},"crd2s.group":{"c":true,"e":%q},"crd3s.group":{"c":true,"e":%q}}`, expired, expired, notExpired), + "internal.apis.kcp.io/locks-crds": fmt.Sprintf(`{"crd1s.group":{"c":true},"crd3s.group":{"c":true,"e":%q}}`, notExpired), }}}, }, } @@ -125,7 +131,7 @@ func TestReconciler(t *testing.T) { } return tt.logicalCluster, nil }, - updateLogicalCluster: func(ctx context.Context, lc *corev1alpha1.LogicalCluster) error { + updateLogicalClusterCRDs: func(ctx context.Context, lc *corev1alpha1.LogicalCluster) error { if tt.updateError != nil { return tt.updateError } diff --git a/pkg/reconciler/dynamicrestmapper/dynamicrestmapper_dynamic_controller.go b/pkg/reconciler/dynamicrestmapper/dynamicrestmapper_dynamic_controller.go index ceda97a80e5..17aea64b861 100644 --- a/pkg/reconciler/dynamicrestmapper/dynamicrestmapper_dynamic_controller.go +++ b/pkg/reconciler/dynamicrestmapper/dynamicrestmapper_dynamic_controller.go @@ -171,12 +171,20 @@ func getResourceBindingsAnnJSON(lc *corev1alpha1.LogicalCluster) string { return jsonEmptyObj } - ann := lc.Annotations[apibinding.ResourceBindingsAnnotationKey] - if ann == "" { - ann = jsonEmptyObj + // Use GetAllResourceBindings to get the merged view of all annotations + rbs, err := apibinding.GetAllResourceBindings(lc) + if err != nil { + return jsonEmptyObj + } + if len(rbs) == 0 { + return jsonEmptyObj } - return ann + bs, err := json.Marshal(rbs) + if err != nil { + return jsonEmptyObj + } + return string(bs) } func diffResourceBindingsAnn(oldAnn, newAnn apibinding.ResourceBindingsAnnotation) (toRemove, toAdd apibinding.ResourceBindingsAnnotation) { @@ -215,7 +223,8 @@ func (c *DynamicTypesController) enqueueCRDUpdate(crd *apiextensionsv1.CustomRes return } - boundResourcesAnn, err := apibinding.GetResourceBindings(lc) + // Use GetAllResourceBindings to get the merged view of all lock annotations + boundResourcesAnn, err := apibinding.GetAllResourceBindings(lc) if err != nil { utilruntime.HandleError(err) return @@ -261,7 +270,8 @@ func (c *DynamicTypesController) enqueueAPIBindingUpdate(apiBinding *apisv1alpha utilruntime.HandleError(err) return } - boundResourcesAnn, err := apibinding.GetResourceBindings(lc) + // Use GetAllResourceBindings to get the merged view of all lock annotations + boundResourcesAnn, err := apibinding.GetAllResourceBindings(lc) if err != nil { utilruntime.HandleError(err) return diff --git a/pkg/virtual/replication/authorizer/content.go b/pkg/virtual/replication/authorizer/content.go index 8a73edc65df..c4db7aedc2a 100644 --- a/pkg/virtual/replication/authorizer/content.go +++ b/pkg/virtual/replication/authorizer/content.go @@ -122,7 +122,7 @@ func (a *contentAuthorizer) Authorize(ctx context.Context, attr authorizer.Attri if err != nil { return authorizer.DecisionNoOpinion, "logical cluster not found", err } - boundResources, err := apibinding.GetResourceBindings(lc) + boundResources, err := apibinding.GetAllResourceBindings(lc) if err != nil { return authorizer.DecisionNoOpinion, "failed to retrieve bound resources", err } diff --git a/pkg/virtual/replication/authorizer/content_test.go b/pkg/virtual/replication/authorizer/content_test.go index 5a6a20a68c3..08444bf4dfe 100644 --- a/pkg/virtual/replication/authorizer/content_test.go +++ b/pkg/virtual/replication/authorizer/content_test.go @@ -319,7 +319,7 @@ func TestContentAuthorizer(t *testing.T) { return &corev1alpha1.LogicalCluster{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - apibinding.ResourceBindingsAnnotationKey: `{"resource.group": {"n": "apibinding-1"}}`, + apibinding.ResourceBindingsAnnotationKey: `{"resource.group": {"n": "apibinding-1"}}`, //nolint:staticcheck // SA1019 - using deprecated constant in test }, }, }, nil @@ -400,7 +400,7 @@ func TestContentAuthorizer(t *testing.T) { return &corev1alpha1.LogicalCluster{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - apibinding.ResourceBindingsAnnotationKey: `{"resource.group": {"n": "apibinding-1"}}`, + apibinding.ResourceBindingsAnnotationKey: `{"resource.group": {"n": "apibinding-1"}}`, //nolint:staticcheck // SA1019 - using deprecated constant in test }, }, }, nil From cdbd87baaf3fca47eb534b5b51158eb1463d4ee5 Mon Sep 17 00:00:00 2001 From: Mangirdas Judeikis Date: Wed, 13 May 2026 10:50:01 +0300 Subject: [PATCH 2/2] simplify code --- .../apis/apibinding/apibinding_controller.go | 11 +- .../apis/apibinding/apibinding_reconcile.go | 21 +- .../apis/apibinding/logical_cluster_lock.go | 222 +++--------------- .../apibinding/logical_cluster_lock_test.go | 32 ++- 4 files changed, 58 insertions(+), 228 deletions(-) diff --git a/pkg/reconciler/apis/apibinding/apibinding_controller.go b/pkg/reconciler/apis/apibinding/apibinding_controller.go index 9bb8898e46d..2c6f2f43a67 100644 --- a/pkg/reconciler/apis/apibinding/apibinding_controller.go +++ b/pkg/reconciler/apis/apibinding/apibinding_controller.go @@ -290,10 +290,13 @@ func NewController( c.enqueueLogicalCluster(tombstone.Obj[*corev1alpha1.LogicalCluster](obj), logger, "") }, UpdateFunc: func(old, obj interface{}) { - was := old.(*corev1alpha1.LogicalCluster).Annotations[ResourceBindingsAnnotationKey] - is := obj.(*corev1alpha1.LogicalCluster).Annotations[ResourceBindingsAnnotationKey] - if was != is { - c.enqueueLogicalCluster(tombstone.Obj[*corev1alpha1.LogicalCluster](obj), logger, "") + oldLC := old.(*corev1alpha1.LogicalCluster) + newLC := obj.(*corev1alpha1.LogicalCluster) + for _, key := range []string{LocksBindingsAnnotationKey, LocksCRDsAnnotationKey, LocksPendingAnnotationKey} { + if oldLC.Annotations[key] != newLC.Annotations[key] { + c.enqueueLogicalCluster(tombstone.Obj[*corev1alpha1.LogicalCluster](obj), logger, "") + return + } } }, })) diff --git a/pkg/reconciler/apis/apibinding/apibinding_reconcile.go b/pkg/reconciler/apis/apibinding/apibinding_reconcile.go index 0f85f2312ab..4223a5494fc 100644 --- a/pkg/reconciler/apis/apibinding/apibinding_reconcile.go +++ b/pkg/reconciler/apis/apibinding/apibinding_reconcile.go @@ -281,10 +281,7 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp // for intentional removal of schemas, versus movement of schemas // to another APIExport. // Read current bindings annotation to check if we need to update - bindingsBefore, err := GetBindingsLocks(lc) - if err != nil { - return err - } + bindingsBefore := lc.Annotations[LocksBindingsAnnotationKey] lc, _, skipped, err = WithLockedResourcesForBindings(crds, time.Now(), lc, grs.UnsortedList(), ExpirableLock{ Lock: Lock{Name: apiBinding.Name}, }) @@ -315,21 +312,7 @@ func (r *bindingReconciler) reconcile(ctx context.Context, apiBinding *apisv1alp return err } - // Check if bindings annotation changed by comparing before and after - bindingsAfter, err := GetBindingsLocks(lc) - if err != nil { - return err - } - bindingsChanged := len(bindingsBefore) != len(bindingsAfter) - if !bindingsChanged { - for k, v := range bindingsBefore { - if v2, ok := bindingsAfter[k]; !ok || v != v2 { - bindingsChanged = true - break - } - } - } - if bindingsChanged { + if lc.Annotations[LocksBindingsAnnotationKey] != bindingsBefore { if err := r.updateLogicalCluster(ctx, lc); err != nil { return err } diff --git a/pkg/reconciler/apis/apibinding/logical_cluster_lock.go b/pkg/reconciler/apis/apibinding/logical_cluster_lock.go index b0e6914216f..5296520e2da 100644 --- a/pkg/reconciler/apis/apibinding/logical_cluster_lock.go +++ b/pkg/reconciler/apis/apibinding/logical_cluster_lock.go @@ -17,7 +17,6 @@ limitations under the License. package apibinding import ( - "context" "fmt" "time" @@ -26,9 +25,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/json" - "github.com/kcp-dev/logicalcluster/v3" corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1" - kcpclientset "github.com/kcp-dev/sdk/client/clientset/versioned/cluster" ) const ( @@ -58,7 +55,9 @@ const ( FieldManagerPending = "kcp-crdnooverlappinggvr" ) -// Lock is a lock for a resource, part of the apis.kcp.io/resource-bindings annotation. +// Lock is a lock for a resource, part of the per-writer lock annotations on +// the LogicalCluster (LocksBindingsAnnotationKey, LocksCRDsAnnotationKey, +// LocksPendingAnnotationKey). type Lock struct { // Name is the name of the APIBinding, or empty. Name string `json:"n,omitempty"` @@ -76,9 +75,9 @@ type ExpirableLock struct { } // ResourceBindingsAnnotation is a map of "." to bindings. It -// is stored as a JSON string in the LogicalCluster annotation -// apis.kcp.io/resource-bindings. It serves as a lock for resources -// to prevent races of multiple bindings or CRDs owning the same resource. +// is stored as a JSON string in the per-writer lock annotations on the +// LogicalCluster. It serves as a lock for resources to prevent races of +// multiple bindings or CRDs owning the same resource. type ResourceBindingsAnnotation map[string]ExpirableLock // UnmarshalResourceBindingsAnnotation unmarshals JSON-formatted string @@ -183,182 +182,35 @@ func GetAllResourceBindings(lc *corev1alpha1.LogicalCluster) (ResourceBindingsAn return result, nil } -// WithLockedResources tries to lock the resources for the given binding. It -// returns those resources that got successfully locked. If a resource is already -// locked by another binding, it is skipped and returned in the second return -// value. -// -// The logical cluster is not mutated. -func WithLockedResources(crds []*apiextensionsv1.CustomResourceDefinition, now time.Time, lc *corev1alpha1.LogicalCluster, grs []schema.GroupResource, binding ExpirableLock) (*corev1alpha1.LogicalCluster, []schema.GroupResource, map[schema.GroupResource]Lock, error) { - v, found := lc.Annotations[ResourceBindingsAnnotationKey] - if !found || v == "" { - return nil, nil, nil, fmt.Errorf("%s annotation not found, migration has to happen first", ResourceBindingsAnnotationKey) - } - - rbs, err := UnmarshalResourceBindingsAnnotation(v) - if err != nil { - return nil, nil, nil, err - } - - crdNames := make(map[string]bool, len(crds)) - for _, crd := range crds { - crdNames[crd.Name] = true - } - - // find what resources need to be newly locked - skipped := make(map[schema.GroupResource]Lock) - newlyLocked := make([]schema.GroupResource, 0, len(grs)) - locked := make([]schema.GroupResource, 0, len(grs)) - for _, gr := range grs { - b, found := rbs[gr.String()] - if !found || b.Lock == binding.Lock { - newlyLocked = append(newlyLocked, gr) - locked = append(locked, gr) - continue - } - if b.CRD && !crdNames[gr.String()] && b.CRDExpiry != nil && now.After(b.CRDExpiry.Time) { - // CRD binding expired, and CRD is not known - newlyLocked = append(newlyLocked, gr) - locked = append(locked, gr) - continue - } - skipped[gr] = b.Lock - } - - // don't do anything if no resources need to be locked - if len(newlyLocked) == 0 { - return lc, locked, skipped, nil - } - - // update the LogicalCluster with the new binding information - for _, gr := range newlyLocked { - rbs[gr.String()] = binding - } - lc = lc.DeepCopy() - bs, err := json.Marshal(rbs) - if err != nil { - return nil, nil, nil, fmt.Errorf("failed to marshal ResourceBindings annotation: %w", err) - } - lc.Annotations[ResourceBindingsAnnotationKey] = string(bs) - - return lc, locked, skipped, nil -} - -// unlockResource unlocks the resource for the given binding. It updates the -// LogicalCluster with the new binding information IFF at least one resource -// was unlocked. -func unlockResource(ctx context.Context, kcpClusterClient kcpclientset.ClusterInterface, lc *corev1alpha1.LogicalCluster, grs []schema.GroupResource, binding Lock) error { //nolint:unused // will be used eventually. - v, found := lc.Annotations[ResourceBindingsAnnotationKey] - if !found { - return fmt.Errorf("%s annotation not found, migration has to happen first", ResourceBindingsAnnotationKey) - } - - var rbs ResourceBindingsAnnotation - if err := json.Unmarshal([]byte(v), &rbs); err != nil { - return fmt.Errorf("failed to unmarshal ResourceBindings annotation: %w", err) - } - - unlocked := false - for _, gr := range grs { - if bound, found := rbs[gr.String()]; found && bound.Lock == binding { - delete(rbs, gr.String()) - unlocked = true - } - } - if !unlocked { - return nil - } - - lc = lc.DeepCopy() - bs, err := json.Marshal(rbs) - if err != nil { - return fmt.Errorf("failed to marshal ResourceBindings annotation: %w", err) - } - lc.Annotations[ResourceBindingsAnnotationKey] = string(bs) - - _, err = kcpClusterClient.CoreV1alpha1().LogicalClusters().Cluster(logicalcluster.From(lc).Path()).Update(ctx, lc, metav1.UpdateOptions{}) - if err != nil { - return err - } - - return nil -} - // WithLockedResourcesForBindings locks resources for the APIBinding controller. -// It reads from the merged view of all annotations but only writes to the bindings annotation. -// This allows the apibinding controller to use SSA on its own key. +// It reads from the merged view of all annotations but writes only to +// LocksBindingsAnnotationKey, so the apibinding controller can use SSA on its +// own key without conflicting with other writers. func WithLockedResourcesForBindings(crds []*apiextensionsv1.CustomResourceDefinition, now time.Time, lc *corev1alpha1.LogicalCluster, grs []schema.GroupResource, binding ExpirableLock) (*corev1alpha1.LogicalCluster, []schema.GroupResource, map[schema.GroupResource]Lock, error) { - // Read from all sources to check locks - rbs, err := GetAllResourceBindings(lc) - if err != nil { - return nil, nil, nil, err - } - - crdNames := make(map[string]bool, len(crds)) - for _, crd := range crds { - crdNames[crd.Name] = true - } - - // Read the bindings-specific annotation to update - bindingsRbs, err := GetBindingsLocks(lc) - if err != nil { - return nil, nil, nil, err - } - - // find what resources need to be newly locked - skipped := make(map[schema.GroupResource]Lock) - newlyLocked := make([]schema.GroupResource, 0, len(grs)) - locked := make([]schema.GroupResource, 0, len(grs)) - for _, gr := range grs { - b, found := rbs[gr.String()] - if !found || b.Lock == binding.Lock { - newlyLocked = append(newlyLocked, gr) - locked = append(locked, gr) - continue - } - if b.CRD && !crdNames[gr.String()] && b.CRDExpiry != nil && now.After(b.CRDExpiry.Time) { - // CRD binding expired, and CRD is not known - newlyLocked = append(newlyLocked, gr) - locked = append(locked, gr) - continue - } - skipped[gr] = b.Lock - } - - // don't do anything if no resources need to be locked - if len(newlyLocked) == 0 { - return lc, locked, skipped, nil - } - - // update only the bindings annotation - for _, gr := range newlyLocked { - bindingsRbs[gr.String()] = binding - } - lc = lc.DeepCopy() - bs, err := json.Marshal(bindingsRbs) - if err != nil { - return nil, nil, nil, fmt.Errorf("failed to marshal ResourceBindings annotation: %w", err) - } - if lc.Annotations == nil { - lc.Annotations = make(map[string]string) - } - lc.Annotations[LocksBindingsAnnotationKey] = string(bs) - - return lc, locked, skipped, nil + return withLockedResources(crds, now, lc, grs, binding, LocksBindingsAnnotationKey, GetBindingsLocks) } -// GetAllResourceBindingsForRead returns all resource bindings for reading purposes. -// This is a convenience alias for GetAllResourceBindings. -func GetAllResourceBindingsForRead(lc *corev1alpha1.LogicalCluster) (ResourceBindingsAnnotation, error) { - return GetAllResourceBindings(lc) +// WithLockedResourcesForPending locks resources for the admission plugin +// (pending locks). It reads from the merged view of all annotations but writes +// only to LocksPendingAnnotationKey, so the admission plugin can use SSA on +// its own key without conflicting with other writers. +func WithLockedResourcesForPending(crds []*apiextensionsv1.CustomResourceDefinition, now time.Time, lc *corev1alpha1.LogicalCluster, grs []schema.GroupResource, binding ExpirableLock) (*corev1alpha1.LogicalCluster, []schema.GroupResource, map[schema.GroupResource]Lock, error) { + return withLockedResources(crds, now, lc, grs, binding, LocksPendingAnnotationKey, GetPendingLocks) } -// WithLockedResourcesForPending locks resources for the admission plugin (pending locks). -// It reads from the merged view of all annotations but only writes to the pending annotation. -// This allows the admission plugin to use SSA on its own key. -func WithLockedResourcesForPending(crds []*apiextensionsv1.CustomResourceDefinition, now time.Time, lc *corev1alpha1.LogicalCluster, grs []schema.GroupResource, binding ExpirableLock) (*corev1alpha1.LogicalCluster, []schema.GroupResource, map[schema.GroupResource]Lock, error) { - // Read from all sources to check locks - rbs, err := GetAllResourceBindings(lc) +// withLockedResources is the shared implementation for the per-writer +// WithLockedResourcesFor* functions. It reads conflicts from the merged view +// of all annotations but only mutates the annotation identified by targetKey. +func withLockedResources( + crds []*apiextensionsv1.CustomResourceDefinition, + now time.Time, + lc *corev1alpha1.LogicalCluster, + grs []schema.GroupResource, + binding ExpirableLock, + targetKey string, + readTarget func(*corev1alpha1.LogicalCluster) (ResourceBindingsAnnotation, error), +) (*corev1alpha1.LogicalCluster, []schema.GroupResource, map[schema.GroupResource]Lock, error) { + merged, err := GetAllResourceBindings(lc) if err != nil { return nil, nil, nil, err } @@ -368,18 +220,16 @@ func WithLockedResourcesForPending(crds []*apiextensionsv1.CustomResourceDefinit crdNames[crd.Name] = true } - // Read the pending-specific annotation to update - pendingRbs, err := GetPendingLocks(lc) + target, err := readTarget(lc) if err != nil { return nil, nil, nil, err } - // find what resources need to be newly locked skipped := make(map[schema.GroupResource]Lock) newlyLocked := make([]schema.GroupResource, 0, len(grs)) locked := make([]schema.GroupResource, 0, len(grs)) for _, gr := range grs { - b, found := rbs[gr.String()] + b, found := merged[gr.String()] if !found || b.Lock == binding.Lock { newlyLocked = append(newlyLocked, gr) locked = append(locked, gr) @@ -394,24 +244,22 @@ func WithLockedResourcesForPending(crds []*apiextensionsv1.CustomResourceDefinit skipped[gr] = b.Lock } - // don't do anything if no resources need to be locked if len(newlyLocked) == 0 { return lc, locked, skipped, nil } - // update only the pending annotation for _, gr := range newlyLocked { - pendingRbs[gr.String()] = binding + target[gr.String()] = binding } - lc = lc.DeepCopy() - bs, err := json.Marshal(pendingRbs) + bs, err := json.Marshal(target) if err != nil { return nil, nil, nil, fmt.Errorf("failed to marshal ResourceBindings annotation: %w", err) } + lc = lc.DeepCopy() if lc.Annotations == nil { lc.Annotations = make(map[string]string) } - lc.Annotations[LocksPendingAnnotationKey] = string(bs) + lc.Annotations[targetKey] = string(bs) return lc, locked, skipped, nil } diff --git a/pkg/reconciler/apis/apibinding/logical_cluster_lock_test.go b/pkg/reconciler/apis/apibinding/logical_cluster_lock_test.go index bf0e681fd1d..2f35cf8ecc5 100644 --- a/pkg/reconciler/apis/apibinding/logical_cluster_lock_test.go +++ b/pkg/reconciler/apis/apibinding/logical_cluster_lock_test.go @@ -30,7 +30,7 @@ import ( corev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1" ) -func TestWithLockedResources(t *testing.T) { +func TestWithLockedResourcesForBindings(t *testing.T) { now := time.Now().UTC() expired := time.Now().Add(-1).UTC().Format(time.RFC3339) notExpired := time.Now().Add(time.Hour).UTC().Format(time.RFC3339) @@ -46,17 +46,13 @@ func TestWithLockedResources(t *testing.T) { wantSkipped map[schema.GroupResource]Lock wantErr bool }{ - "no annotation errors": { - lc: &corev1alpha1.LogicalCluster{}, - grs: []schema.GroupResource{{Group: "group", Resource: "foos"}}, - binding: ExpirableLock{Lock: Lock{Name: "binding"}}, - wantErr: true, - }, - "empty annotation errors": { - lc: newLogicalClusterWithAnnotation(""), - grs: []schema.GroupResource{{Group: "group", Resource: "foos"}}, - binding: ExpirableLock{Lock: Lock{Name: "binding"}}, - wantErr: true, + "no annotation succeeds": { + lc: &corev1alpha1.LogicalCluster{}, + grs: []schema.GroupResource{{Group: "group", Resource: "foos"}}, + binding: ExpirableLock{Lock: Lock{Name: "binding"}}, + want: newLogicalClusterWithAnnotation(`{"foos.group":{"n":"binding"}}`), + wantLocked: []schema.GroupResource{{Group: "group", Resource: "foos"}}, + wantSkipped: map[schema.GroupResource]Lock{}, }, "invalid annotation errors": { lc: newLogicalClusterWithAnnotation("invalid"), @@ -175,19 +171,19 @@ func TestWithLockedResources(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { - got, locked, skipped, err := WithLockedResources(tt.crds, now, tt.lc, tt.grs, tt.binding) + got, locked, skipped, err := WithLockedResourcesForBindings(tt.crds, now, tt.lc, tt.grs, tt.binding) if (err != nil) != tt.wantErr { - t.Errorf("WithLockedResources() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("WithLockedResourcesForBindings() error = %v, wantErr %v", err, tt.wantErr) return } if diff := cmp.Diff(tt.want, got); diff != "" { - t.Errorf("WithLockedResources() +got -want\n%s", diff) + t.Errorf("WithLockedResourcesForBindings() +got -want\n%s", diff) } if diff := cmp.Diff(locked, tt.wantLocked); diff != "" { - t.Errorf("WithLockedResources() +got -want:\n%s", diff) + t.Errorf("WithLockedResourcesForBindings() +got -want:\n%s", diff) } if diff := cmp.Diff(skipped, tt.wantSkipped); diff != "" { - t.Errorf("WithLockedResources() +got -want:\n%s", diff) + t.Errorf("WithLockedResourcesForBindings() +got -want:\n%s", diff) } }) } @@ -197,7 +193,7 @@ func newLogicalClusterWithAnnotation(ann string) *corev1alpha1.LogicalCluster { return &corev1alpha1.LogicalCluster{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ - ResourceBindingsAnnotationKey: ann, + LocksBindingsAnnotationKey: ann, }, }, }