Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions internal/harness/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ import (

docker "github.com/moby/moby/client"
"gopkg.in/yaml.v2"
corev1 "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -289,13 +292,13 @@ func (h *Harness) Config() (*rest.Config, error) {
}

func (h *Harness) waitForFunctionalCluster() error {
err := kubernetes.WaitForSA(h.config, "default", "default")
err := h.waitForSA("default", "default")
if err == nil {
return nil
}
// if there is a namespace provided but no "default"/"default" SA found, also check a SA in the provided NS
if h.TestSuite.Namespace != "" {
tempErr := kubernetes.WaitForSA(h.config, "default", h.TestSuite.Namespace)
tempErr := h.waitForSA("default", h.TestSuite.Namespace)
if tempErr == nil {
return nil
}
Expand All @@ -304,6 +307,34 @@ func (h *Harness) waitForFunctionalCluster() error {
return err
}

// waitForSA waits for a service account to be present.
func (h *Harness) waitForSA(name, namespace string) error {
c, err := kubernetes.NewRetryClient(h.config, client.Options{
Scheme: kubernetes.Scheme(),
})
if err != nil {
return err
}

obj := &corev1.ServiceAccount{}

key := client.ObjectKey{
Namespace: namespace,
Name: name,
}
return wait.PollUntilContextTimeout(context.TODO(), 500*time.Millisecond, 60*time.Second, true, func(ctx context.Context) (done bool, err error) {
err = c.Get(ctx, key, obj)
if apiErrors.IsNotFound(err) {
return false, nil
}
if err != nil {
h.T.Logf("Error waiting for service account %s/%s (will retry): %v", namespace, name, err)
return false, nil
}
return true, nil
})
}

// Client returns the current Kubernetes client for the test harness.
func (h *Harness) Client(forceNew bool) (client.Client, error) {
h.clientLock.Lock()
Expand Down
53 changes: 17 additions & 36 deletions internal/kubernetes/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,38 @@ package kubernetes

import (
"context"
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// WaitForDelete waits for the provide runtime objects to be deleted from cluster
func WaitForDelete(c *RetryClient, objs []runtime.Object) error {
// Wait for resources to be deleted.
return wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, 10*time.Second, true, func(ctx context.Context) (done bool, err error) {
for _, obj := range objs {
// WaitForDelete waits for the provided runtime objects to be deleted from cluster, up to duration.
// Retries on transient errors.
func WaitForDelete(cl client.Client, toDelete []client.Object, duration time.Duration) error {
lastCheckMsg := ""
err := wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, duration, true, func(ctx context.Context) (done bool, err error) {
for _, obj := range toDelete {
actual := &unstructured.Unstructured{}
actual.SetGroupVersionKind(obj.GetObjectKind().GroupVersionKind())
err = c.Get(ctx, ObjectKey(obj), actual)
if err == nil || !errors.IsNotFound(err) {
return false, err
err = cl.Get(ctx, ObjectKey(obj), actual)
if err == nil {
lastCheckMsg = fmt.Sprintf("%v %s still exists", obj.GetObjectKind().GroupVersionKind(), obj.GetName())
return false, nil
}
if !errors.IsNotFound(err) {
lastCheckMsg = fmt.Sprintf("checking existence of %v %s failed: %v", obj.GetObjectKind().GroupVersionKind(), obj.GetName(), err)
return false, nil
}
}

return true, nil
})
}

// WaitForSA waits for a service account to be present
func WaitForSA(config *rest.Config, name, namespace string) error {
c, err := NewRetryClient(config, client.Options{
Scheme: Scheme(),
})
if err != nil {
return err
return fmt.Errorf("timed out waiting for resource deletion (result of last check was: %q): %w", lastCheckMsg, err)
}

obj := &corev1.ServiceAccount{}

key := client.ObjectKey{
Namespace: namespace,
Name: name,
}
return wait.PollUntilContextTimeout(context.TODO(), 500*time.Millisecond, 60*time.Second, true, func(ctx context.Context) (done bool, err error) {
err = c.Get(ctx, key, obj)
if errors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}
return true, nil
})
return nil
}
86 changes: 86 additions & 0 deletions internal/kubernetes/wait_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package kubernetes

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
)

func testObj() *unstructured.Unstructured {
obj := &unstructured.Unstructured{}
obj.SetGroupVersionKind(schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"})
obj.SetName("cm")
obj.SetNamespace("default")
return obj
}

func TestWaitForDelete_AlreadyGone(t *testing.T) {
cl := fake.NewClientBuilder().Build()

err := WaitForDelete(cl, []client.Object{testObj()}, time.Second*2)
require.NoError(t, err)
}

func TestWaitForDelete_TransientErrorThenGone(t *testing.T) {
callCount := 0
cl := fake.NewClientBuilder().WithInterceptorFuncs(interceptor.Funcs{
Get: func(context.Context, client.WithWatch, client.ObjectKey, client.Object, ...client.GetOption) error {
callCount++
if callCount <= 3 {
return fmt.Errorf("transient API error")
}
return k8serrors.NewNotFound(schema.GroupResource{Resource: "configmaps"}, "cm")
},
}).Build()

err := WaitForDelete(cl, []client.Object{testObj()}, time.Second*2)
require.NoError(t, err)
assert.Greater(t, callCount, 3)
}

func TestWaitForDelete_StillExistsThenGone(t *testing.T) {
callCount := 0
cl := fake.NewClientBuilder().WithInterceptorFuncs(interceptor.Funcs{
Get: func(context.Context, client.WithWatch, client.ObjectKey, client.Object, ...client.GetOption) error {
callCount++
if callCount <= 2 {
return nil
}
return k8serrors.NewNotFound(schema.GroupResource{Resource: "configmaps"}, "cm")
},
}).Build()

err := WaitForDelete(cl, []client.Object{testObj()}, time.Second*2)
require.NoError(t, err)
assert.Greater(t, callCount, 2)
}

func TestWaitForDelete_PersistentErrorTimesOut(t *testing.T) {
callCount := 0
cl := fake.NewClientBuilder().WithInterceptorFuncs(interceptor.Funcs{
Get: func(context.Context, client.WithWatch, client.ObjectKey, client.Object, ...client.GetOption) error {
callCount++
if callCount <= 2 {
return fmt.Errorf("initial transient error")
}
return fmt.Errorf("persistent API error")
},
}).Build()

err := WaitForDelete(cl, []client.Object{testObj()}, time.Second*2)
assert.Error(t, err)
assert.ErrorIs(t, err, context.DeadlineExceeded)
assert.ErrorContains(t, err, "result of last check was:")
assert.ErrorContains(t, err, "failed: persistent API error")
assert.NotContains(t, err.Error(), "initial transient error")
}
25 changes: 1 addition & 24 deletions internal/step/step.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -166,29 +165,7 @@ func (s *Step) DeleteExisting(namespace string) error {
s.Logger.Log(kubernetes.ResourceID(del), action)
}

// Wait for resources to be deleted.
lastCheckMsg := ""
err = wait.PollUntilContextTimeout(context.TODO(), 100*time.Millisecond, time.Duration(s.GetTimeout())*time.Second, true, func(ctx context.Context) (done bool, err error) {
for _, obj := range toDelete {
actual := &unstructured.Unstructured{}
actual.SetGroupVersionKind(obj.GetObjectKind().GroupVersionKind())
err = cl.Get(ctx, kubernetes.ObjectKey(obj), actual)
if err == nil {
lastCheckMsg = fmt.Sprintf("%v %s still exists", obj.GetObjectKind().GroupVersionKind(), obj.GetName())
return false, nil
}
if !k8serrors.IsNotFound(err) {
lastCheckMsg = fmt.Sprintf("checking existence of %v %s failed: %v", obj.GetObjectKind().GroupVersionKind(), obj.GetName(), err)
return false, err
}
}

return true, nil
})
if err != nil {
return fmt.Errorf("timed out waiting for resource deletion (result of last check was: %q): %w", lastCheckMsg, err)
}
return nil
return kubernetes.WaitForDelete(cl, toDelete, time.Duration(s.GetTimeout())*time.Second)
}

// Create applies all resources defined in the Apply list.
Expand Down
3 changes: 2 additions & 1 deletion internal/testcase/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ func (c *Case) deleteNamespace(cl clientWithKubeConfig) error {
return true, nil
}
if err != nil {
return false, fmt.Errorf("failed to check deletion of namespace %q: %w", c.ns.name, err)
cl.Logf("failed to check deletion of namespace %q (will retry): %v", c.ns.name, err)
return false, nil
}
return false, nil
})
Expand Down
Loading