Skip to content

Commit 8c7de5c

Browse files
feat: support graceful scale-down for AlluxioRuntime using AdvancedStatefulSet (#4193)
1 parent 82e490e commit 8c7de5c

5 files changed

Lines changed: 425 additions & 0 deletions

File tree

pkg/ddc/alluxio/const.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,10 @@ const (
5757
defaultGracefulShutdownLimits int32 = 3
5858
defaultCleanCacheGracePeriodSeconds int32 = 60
5959

60+
// defaultWorkerRPCPort is the Alluxio worker Thrift RPC port used when the
61+
// runtime spec does not override alluxio.worker.rpc.port.
62+
defaultWorkerRPCPort = 29999
63+
6064
MountConfigStorage = "ALLUXIO_MOUNT_CONFIG_STORAGE"
6165
ConfigmapStorageName = "configmap"
6266
)
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
Copyright 2024 The Fluid Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package operations
18+
19+
import "strings"
20+
21+
// DecommissionWorkers signals the Alluxio master to decommission the given
22+
// workers. Each address must be in "<host>:<rpcPort>" form.
23+
// The call is idempotent: re-issuing it against an already-decommissioned
24+
// worker is safe.
25+
func (a AlluxioFileUtils) DecommissionWorkers(addresses []string) error {
26+
if len(addresses) == 0 {
27+
return nil
28+
}
29+
command := []string{
30+
"alluxio", "fsadmin", "decommission",
31+
"--addresses", strings.Join(addresses, ","),
32+
}
33+
_, _, err := a.exec(command, false)
34+
if err != nil {
35+
a.log.Error(err, "AlluxioFileUtils.DecommissionWorkers() failed", "addresses", addresses)
36+
}
37+
return err
38+
}
39+
40+
// CountActiveWorkers returns the number of workers currently tracked by the
41+
// Alluxio master according to "alluxio fsadmin report capacity".
42+
func (a AlluxioFileUtils) CountActiveWorkers() (int, error) {
43+
report, _, err := a.exec([]string{"alluxio", "fsadmin", "report", "capacity"}, false)
44+
if err != nil {
45+
a.log.Error(err, "AlluxioFileUtils.CountActiveWorkers() failed")
46+
return 0, err
47+
}
48+
return parseActiveWorkerCount(report), nil
49+
}
50+
51+
// parseActiveWorkerCount counts workers in the capacity report produced by
52+
// "alluxio fsadmin report capacity". Worker entries begin at the non-indented
53+
// line after the "Worker Name" header; the indented line that follows each
54+
// entry contains the used-capacity detail.
55+
//
56+
// Worker Name Last Heartbeat Storage MEM
57+
// 192.168.1.147 0 capacity 2048.00MB <- worker entry
58+
// used 443.89MB (21%) <- detail, indented
59+
// 192.168.1.146 0 capacity 2048.00MB <- worker entry
60+
// used 0B (0%)
61+
func parseActiveWorkerCount(report string) int {
62+
inWorkerSection := false
63+
count := 0
64+
for _, line := range strings.Split(report, "\n") {
65+
if strings.HasPrefix(line, "Worker Name") {
66+
inWorkerSection = true
67+
continue
68+
}
69+
if !inWorkerSection || strings.TrimSpace(line) == "" {
70+
continue
71+
}
72+
// Non-indented lines are new worker entries; indented lines are
73+
// the used-capacity continuation for the previous entry.
74+
if line[0] != ' ' && line[0] != '\t' {
75+
count++
76+
}
77+
}
78+
return count
79+
}
Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
Copyright 2024 The Fluid Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package operations
18+
19+
import (
20+
"errors"
21+
"testing"
22+
23+
"github.com/agiledragon/gomonkey/v2"
24+
"github.com/fluid-cloudnative/fluid/pkg/utils/fake"
25+
)
26+
27+
func TestAlluxioFileUtils_DecommissionWorkers(t *testing.T) {
28+
a := &AlluxioFileUtils{log: fake.NullLogger()}
29+
30+
t.Run("empty address list is a no-op", func(t *testing.T) {
31+
if err := a.DecommissionWorkers(nil); err != nil {
32+
t.Fatalf("want nil, got: %v", err)
33+
}
34+
if err := a.DecommissionWorkers([]string{}); err != nil {
35+
t.Fatalf("want nil, got: %v", err)
36+
}
37+
})
38+
39+
t.Run("exec error is propagated", func(t *testing.T) {
40+
patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec,
41+
func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) {
42+
return "", "", errors.New("exec failed")
43+
})
44+
defer patches.Reset()
45+
46+
if err := a.DecommissionWorkers([]string{"192.168.1.1:29999"}); err == nil {
47+
t.Error("want error, got nil")
48+
}
49+
})
50+
51+
t.Run("address is forwarded to the alluxio CLI", func(t *testing.T) {
52+
var capturedCmd []string
53+
patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec,
54+
func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) {
55+
capturedCmd = cmd
56+
return "", "", nil
57+
})
58+
defer patches.Reset()
59+
60+
addr := "192.168.1.1:29999"
61+
if err := a.DecommissionWorkers([]string{addr}); err != nil {
62+
t.Fatalf("want nil, got: %v", err)
63+
}
64+
found := false
65+
for _, arg := range capturedCmd {
66+
if arg == addr {
67+
found = true
68+
break
69+
}
70+
}
71+
if !found {
72+
t.Errorf("address %q not found in command: %v", addr, capturedCmd)
73+
}
74+
})
75+
76+
t.Run("multiple addresses are joined with commas", func(t *testing.T) {
77+
var capturedCmd []string
78+
patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec,
79+
func(_ AlluxioFileUtils, cmd []string, _ bool) (string, string, error) {
80+
capturedCmd = cmd
81+
return "", "", nil
82+
})
83+
defer patches.Reset()
84+
85+
if err := a.DecommissionWorkers([]string{"10.0.0.1:29999", "10.0.0.2:29999"}); err != nil {
86+
t.Fatalf("want nil, got: %v", err)
87+
}
88+
found := false
89+
for _, arg := range capturedCmd {
90+
if arg == "10.0.0.1:29999,10.0.0.2:29999" {
91+
found = true
92+
break
93+
}
94+
}
95+
if !found {
96+
t.Errorf("joined addresses not found in command: %v", capturedCmd)
97+
}
98+
})
99+
}
100+
101+
func TestAlluxioFileUtils_CountActiveWorkers(t *testing.T) {
102+
a := &AlluxioFileUtils{log: fake.NullLogger()}
103+
104+
t.Run("exec error returns zero and the error", func(t *testing.T) {
105+
patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec,
106+
func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) {
107+
return "", "", errors.New("exec failed")
108+
})
109+
defer patches.Reset()
110+
111+
count, err := a.CountActiveWorkers()
112+
if err == nil {
113+
t.Error("want error, got nil")
114+
}
115+
if count != 0 {
116+
t.Errorf("want 0 on error, got %d", count)
117+
}
118+
})
119+
120+
t.Run("two active workers", func(t *testing.T) {
121+
report := `Capacity information for all workers:
122+
Total Capacity: 4096.00MB
123+
Used Capacity: 443.89MB
124+
125+
Worker Name Last Heartbeat Storage MEM
126+
192.168.1.147 0 capacity 2048.00MB
127+
used 443.89MB (21%)
128+
192.168.1.146 0 capacity 2048.00MB
129+
used 0B (0%)
130+
`
131+
patches := gomonkey.ApplyFunc(AlluxioFileUtils.exec,
132+
func(_ AlluxioFileUtils, _ []string, _ bool) (string, string, error) {
133+
return report, "", nil
134+
})
135+
defer patches.Reset()
136+
137+
count, err := a.CountActiveWorkers()
138+
if err != nil {
139+
t.Fatalf("want nil, got: %v", err)
140+
}
141+
if count != 2 {
142+
t.Errorf("want 2, got %d", count)
143+
}
144+
})
145+
}
146+
147+
func TestParseActiveWorkerCount(t *testing.T) {
148+
cases := []struct {
149+
name string
150+
input string
151+
expect int
152+
}{
153+
{
154+
name: "empty report",
155+
input: "",
156+
expect: 0,
157+
},
158+
{
159+
name: "no worker section header",
160+
input: "Capacity information for all workers:\n Total Capacity: 0B\n",
161+
expect: 0,
162+
},
163+
{
164+
name: "single worker",
165+
input: `Worker Name Last Heartbeat Storage MEM
166+
192.168.1.1 0 capacity 1024.00MB
167+
used 0B (0%)
168+
`,
169+
expect: 1,
170+
},
171+
{
172+
name: "three workers",
173+
input: `Worker Name Last Heartbeat Storage MEM
174+
10.0.0.1 0 capacity 2048.00MB
175+
used 100MB (5%)
176+
10.0.0.2 0 capacity 2048.00MB
177+
used 0B (0%)
178+
10.0.0.3 0 capacity 2048.00MB
179+
used 500MB (25%)
180+
`,
181+
expect: 3,
182+
},
183+
{
184+
name: "trailing blank lines are ignored",
185+
input: `Worker Name Last Heartbeat Storage MEM
186+
10.0.0.1 0 capacity 1024.00MB
187+
used 0B (0%)
188+
189+
190+
`,
191+
expect: 1,
192+
},
193+
}
194+
195+
for _, tc := range cases {
196+
t.Run(tc.name, func(t *testing.T) {
197+
got := parseActiveWorkerCount(tc.input)
198+
if got != tc.expect {
199+
t.Errorf("want %d, got %d", tc.expect, got)
200+
}
201+
})
202+
}
203+
}

0 commit comments

Comments
 (0)