From 74bd53b338921283a661c1761359f241052faade Mon Sep 17 00:00:00 2001 From: Jathavedhan M Date: Mon, 8 Jun 2026 17:18:25 +0530 Subject: [PATCH] [manila-csi-plugin] Retry proxied probe on Unavailable The Manila CSI node plugin fatally exits on startup when the proxied CSI driver socket (e.g. NFS) is not yet available, because ProbeForever only retries on DeadlineExceeded and immediately returns on Unavailable. Wrap the ProbeForever call in a retry loop that retries on codes.Unavailable within the existing 15-second context timeout, so transient connection errors during concurrent DaemonSet restarts no longer cause a fatal exit. Signed-off-by: Jathavedhan M --- pkg/csi/manila/driver.go | 19 ++++- pkg/csi/manila/driver_test.go | 136 ++++++++++++++++++++++++++++++++++ 2 files changed, 153 insertions(+), 2 deletions(-) create mode 100644 pkg/csi/manila/driver_test.go diff --git a/pkg/csi/manila/driver.go b/pkg/csi/manila/driver.go index 30d1b1f5d3..390ac86dc2 100644 --- a/pkg/csi/manila/driver.go +++ b/pkg/csi/manila/driver.go @@ -29,6 +29,8 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" v1 "k8s.io/client-go/listers/core/v1" "k8s.io/cloud-provider-openstack/pkg/csi/manila/csiclient" "k8s.io/cloud-provider-openstack/pkg/csi/manila/manilaclient" @@ -274,8 +276,21 @@ func (d *Driver) initProxiedDriver() (csiNodeCapabilitySet, error) { identityClient := d.csiClientBuilder.NewIdentityServiceClient(conn) - if err = identityClient.ProbeForever(ctx, conn, time.Second*5); err != nil { - return nil, fmt.Errorf("probe failed: %v", err) + for { + if err = identityClient.ProbeForever(ctx, conn, time.Second*5); err == nil { + break + } + if status.Code(err) != codes.Unavailable { + return nil, fmt.Errorf("probe failed: %v", err) + } + klog.Warningf("proxied CSI driver probe returned Unavailable for %s, retrying: %v", d.fwdEndpoint, err) + select { + case <-ctx.Done(): + case <-time.After(time.Second): + } + if ctx.Err() != nil { + return nil, fmt.Errorf("timed out probing proxied CSI driver %s: %v", d.fwdEndpoint, err) + } } pluginInfo, err := identityClient.GetPluginInfo(ctx) diff --git a/pkg/csi/manila/driver_test.go b/pkg/csi/manila/driver_test.go new file mode 100644 index 0000000000..29f9645d06 --- /dev/null +++ b/pkg/csi/manila/driver_test.go @@ -0,0 +1,136 @@ +/* +Copyright 2026 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package manila + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + "k8s.io/cloud-provider-openstack/pkg/csi/manila/csiclient" +) + +type unavailableIdentityClient struct { + unavailableCount int32 + calls int32 +} + +func (c *unavailableIdentityClient) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) { + return &csi.ProbeResponse{}, nil +} + +func (c *unavailableIdentityClient) GetPluginInfo(context.Context) (*csi.GetPluginInfoResponse, error) { + return &csi.GetPluginInfoResponse{ + Name: "fake-driver", + VendorVersion: "1.0.0", + }, nil +} + +func (c *unavailableIdentityClient) ProbeForever(context.Context, *grpc.ClientConn, time.Duration) error { + n := atomic.AddInt32(&c.calls, 1) + if n <= c.unavailableCount { + return status.Error(codes.Unavailable, "not ready yet") + } + return nil +} + +type stubNodeClient struct{} + +func (c *stubNodeClient) GetCapabilities(context.Context) (*csi.NodeGetCapabilitiesResponse, error) { + return &csi.NodeGetCapabilitiesResponse{ + Capabilities: []*csi.NodeServiceCapability{ + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, + }, + }, + }, + }, + }, nil +} + +func (c *stubNodeClient) GetVolumeStats(ctx context.Context, req *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) { + return nil, status.Error(codes.Unimplemented, "") +} + +func (c *stubNodeClient) StageVolume(context.Context, *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + return &csi.NodeStageVolumeResponse{}, nil +} + +func (c *stubNodeClient) UnstageVolume(context.Context, *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { + return &csi.NodeUnstageVolumeResponse{}, nil +} + +func (c *stubNodeClient) PublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { + return &csi.NodePublishVolumeResponse{}, nil +} + +func (c *stubNodeClient) UnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { + return &csi.NodeUnpublishVolumeResponse{}, nil +} + +type testCSIClientBuilder struct { + identityClient csiclient.Identity + nodeClient csiclient.Node +} + +func (b *testCSIClientBuilder) NewConnection(string) (*grpc.ClientConn, error) { + return grpc.NewClient("localhost", grpc.WithTransportCredentials(insecure.NewCredentials())) +} + +func (b *testCSIClientBuilder) NewConnectionWithContext(context.Context, string) (*grpc.ClientConn, error) { + return grpc.NewClient("localhost", grpc.WithTransportCredentials(insecure.NewCredentials())) +} + +func (b *testCSIClientBuilder) NewNodeServiceClient(conn *grpc.ClientConn) csiclient.Node { + return b.nodeClient +} + +func (b *testCSIClientBuilder) NewIdentityServiceClient(conn *grpc.ClientConn) csiclient.Identity { + return b.identityClient +} + +func TestInitProxiedDriverRetryOnUnavailable(t *testing.T) { + idClient := &unavailableIdentityClient{unavailableCount: 3} + builder := &testCSIClientBuilder{ + identityClient: idClient, + nodeClient: &stubNodeClient{}, + } + + d := &Driver{ + fwdEndpoint: "unix:///tmp/fake.sock", + csiClientBuilder: builder, + } + + caps, err := d.initProxiedDriver() + if err != nil { + t.Fatalf("expected success after retries, got error: %v", err) + } + if !caps[csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME] { + t.Error("expected STAGE_UNSTAGE_VOLUME capability") + } + if atomic.LoadInt32(&idClient.calls) != 4 { + t.Errorf("expected 4 ProbeForever calls (3 Unavailable + 1 success), got %d", idClient.calls) + } +}