Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/ddc/alluxio/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ const (
defaultGracefulShutdownLimits int32 = 3
defaultCleanCacheGracePeriodSeconds int32 = 60

// defaultWorkerRPCPort is the Alluxio worker Thrift RPC port used when the
// runtime spec does not override alluxio.worker.rpc.port.
defaultWorkerRPCPort = 29999

MountConfigStorage = "ALLUXIO_MOUNT_CONFIG_STORAGE"
ConfigmapStorageName = "configmap"
)
79 changes: 79 additions & 0 deletions pkg/ddc/alluxio/operations/decommission.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2024 The Fluid Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package operations

import "strings"

// DecommissionWorkers signals the Alluxio master to decommission the given
// workers. Each address must be in "<host>:<rpcPort>" form.
// The call is idempotent: re-issuing it against an already-decommissioned
// worker is safe.
func (a AlluxioFileUtils) DecommissionWorkers(addresses []string) error {
if len(addresses) == 0 {
return nil
}
command := []string{
"alluxio", "fsadmin", "decommission",
"--addresses", strings.Join(addresses, ","),
}
_, _, err := a.exec(command, false)
if err != nil {
a.log.Error(err, "AlluxioFileUtils.DecommissionWorkers() failed", "addresses", addresses)
}
return err
}

// CountActiveWorkers returns the number of workers currently tracked by the
// Alluxio master according to "alluxio fsadmin report capacity".
func (a AlluxioFileUtils) CountActiveWorkers() (int, error) {
report, _, err := a.exec([]string{"alluxio", "fsadmin", "report", "capacity"}, false)
if err != nil {
a.log.Error(err, "AlluxioFileUtils.CountActiveWorkers() failed")
return 0, err
}
return parseActiveWorkerCount(report), nil
}

// parseActiveWorkerCount counts workers in the capacity report produced by
// "alluxio fsadmin report capacity". Worker entries begin at the non-indented
// line after the "Worker Name" header; the indented line that follows each
// entry contains the used-capacity detail.
//
// Worker Name Last Heartbeat Storage MEM
// 192.168.1.147 0 capacity 2048.00MB <- worker entry
// used 443.89MB (21%) <- detail, indented
// 192.168.1.146 0 capacity 2048.00MB <- worker entry
// used 0B (0%)
func parseActiveWorkerCount(report string) int {
inWorkerSection := false
count := 0
for _, line := range strings.Split(report, "\n") {
if strings.HasPrefix(line, "Worker Name") {
inWorkerSection = true
continue
}
if !inWorkerSection || strings.TrimSpace(line) == "" {
continue
}
// Non-indented lines are new worker entries; indented lines are
// the used-capacity continuation for the previous entry.
if line[0] != ' ' && line[0] != '\t' {
count++
}
}
return count
}
203 changes: 203 additions & 0 deletions pkg/ddc/alluxio/operations/decommission_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
/*
Copyright 2024 The Fluid Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package operations

import (
"errors"
"testing"

"github.com/agiledragon/gomonkey/v2"
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
)

func TestAlluxioFileUtils_DecommissionWorkers(t *testing.T) {

Check failure on line 27 in pkg/ddc/alluxio/operations/decommission_test.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 24 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=fluid-cloudnative_fluid&issues=AZ272vOwF0ZThpCSw4Vx&open=AZ272vOwF0ZThpCSw4Vx&pullRequest=5805
a := &AlluxioFileUtils{log: fake.NullLogger()}

t.Run("empty address list is a no-op", func(t *testing.T) {
if err := a.DecommissionWorkers(nil); err != nil {
t.Fatalf("want nil, got: %v", err)
}
if err := a.DecommissionWorkers([]string{}); err != nil {
t.Fatalf("want nil, got: %v", err)
}
})

t.Run("exec error is propagated", func(t *testing.T) {
patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec,
func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) {
return "", "", errors.New("exec failed")
})
defer patches.Reset()

if err := a.DecommissionWorkers([]string{"192.168.1.1:29999"}); err == nil {
t.Error("want error, got nil")
}
})

t.Run("address is forwarded to the alluxio CLI", func(t *testing.T) {
var capturedCmd []string
patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec,
func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) {
capturedCmd = cmd
return "", "", nil
})
defer patches.Reset()

addr := "192.168.1.1:29999"
if err := a.DecommissionWorkers([]string{addr}); err != nil {
t.Fatalf("want nil, got: %v", err)
}
found := false
for _, arg := range capturedCmd {
if arg == addr {
found = true
break
}
}
if !found {
t.Errorf("address %q not found in command: %v", addr, capturedCmd)
}
})

t.Run("multiple addresses are joined with commas", func(t *testing.T) {
var capturedCmd []string
patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec,
func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) {
capturedCmd = cmd
return "", "", nil
})
defer patches.Reset()

if err := a.DecommissionWorkers([]string{"10.0.0.1:29999", "10.0.0.2:29999"}); err != nil {
t.Fatalf("want nil, got: %v", err)
}
found := false
for _, arg := range capturedCmd {
if arg == "10.0.0.1:29999,10.0.0.2:29999" {
found = true
break
}
}
if !found {
t.Errorf("joined addresses not found in command: %v", capturedCmd)
}
})
}

func TestAlluxioFileUtils_CountActiveWorkers(t *testing.T) {
a := &AlluxioFileUtils{log: fake.NullLogger()}

t.Run("exec error returns zero and the error", func(t *testing.T) {
patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec,
func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) {
return "", "", errors.New("exec failed")
})
defer patches.Reset()

count, err := a.CountActiveWorkers()
if err == nil {
t.Error("want error, got nil")
}
if count != 0 {
t.Errorf("want 0 on error, got %d", count)
}
})

t.Run("two active workers", func(t *testing.T) {
report := `Capacity information for all workers:
Total Capacity: 4096.00MB
Used Capacity: 443.89MB

Worker Name Last Heartbeat Storage MEM
192.168.1.147 0 capacity 2048.00MB
used 443.89MB (21%)
192.168.1.146 0 capacity 2048.00MB
used 0B (0%)
`
patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec,
func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) {
return report, "", nil
})
defer patches.Reset()

count, err := a.CountActiveWorkers()
if err != nil {
t.Fatalf("want nil, got: %v", err)
}
if count != 2 {
t.Errorf("want 2, got %d", count)
}
})
}

func TestParseActiveWorkerCount(t *testing.T) {
cases := []struct {
name string
input string
expect int
}{
{
name: "empty report",
input: "",
expect: 0,
},
{
name: "no worker section header",
input: "Capacity information for all workers:\n Total Capacity: 0B\n",
expect: 0,
},
{
name: "single worker",
input: `Worker Name Last Heartbeat Storage MEM
192.168.1.1 0 capacity 1024.00MB
used 0B (0%)
`,
expect: 1,
},
{
name: "three workers",
input: `Worker Name Last Heartbeat Storage MEM
10.0.0.1 0 capacity 2048.00MB
used 100MB (5%)
10.0.0.2 0 capacity 2048.00MB
used 0B (0%)
10.0.0.3 0 capacity 2048.00MB
used 500MB (25%)
`,
expect: 3,
},
{
name: "trailing blank lines are ignored",
input: `Worker Name Last Heartbeat Storage MEM
10.0.0.1 0 capacity 1024.00MB
used 0B (0%)


`,
expect: 1,
},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
got := parseActiveWorkerCount(tc.input)
if got != tc.expect {
t.Errorf("want %d, got %d", tc.expect, got)
}
})
}
}
Loading
Loading