Skip to content

Commit df3d2bb

Browse files
authored
feat: support injecting file prefetcher sidecar to app pods (#4490)
* Add fileprefetcher webhook plugin Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> * Fail fast when no valid file prefetcher image is set Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> * Fix extra env code logic Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> * Add fluid file prefetcher Dockerfile Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> * Add license header Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> * Use COPY instead of ADD for copying local resources Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> * Enable FilePrefetcher by default Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> * fix comments Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> * Add unit tests for file prefetcher Signed-off-by: trafalgarzzz <trafalgarz@outlook.com> --------- Signed-off-by: trafalgarzzz <trafalgarz@outlook.com>
1 parent 441d216 commit df3d2bb

9 files changed

Lines changed: 902 additions & 0 deletions

File tree

charts/fluid/fluid/values.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,13 +319,15 @@ webhook:
319319
- NodeAffinityWithCache
320320
- MountPropagationInjector
321321
- DatasetUsageInjector
322+
- FilePrefetcher
322323
withoutDataset:
323324
- PreferNodesWithoutCache
324325
# serverless webhook plugins
325326
serverless:
326327
withDataset:
327328
- FuseSidecar
328329
- DatasetUsageInjector
330+
- FilePrefetcher
329331
withoutDataset: []
330332
pluginConfig:
331333
- name: NodeAffinityWithCache
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
Copyright 2025 The Fluid Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package fileprefetcher
18+
19+
// Environment variables for file prefetcher
20+
const (
21+
envKeyFilePrefetcherFileList = "FILE_PREFETCHER_FILE_LIST"
22+
envKeyFilePrefetcherAsyncPrefetch = "FILE_PREFETCHER_ASYNC_PREFETCH"
23+
envKeyFilePrefetcherTimeoutSeconds = "FILE_PREFETCHER_TIMEOUT_SECONDS"
24+
25+
envKeyFilePrefetcherImage = "FILE_PREFETCHER_IMAGE"
26+
)
27+
28+
// Constants for file prefetcher
29+
const (
30+
filePrefetcherContainerName = "fluid-file-prefetcher"
31+
filePrefetcherStatusVolumeName = "fluid-file-prefetcher-status-vol"
32+
filePrefetcherStatusVolumeMountPath = "/tmp/fluid-file-prefetcher/status"
33+
34+
filePrefetcherDefaultFileList = "<ALL>"
35+
filePrefetcherDefaultTimeoutSecondsStr = "120"
36+
)
Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
/*
2+
Copyright 2025 The Fluid Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package fileprefetcher
18+
19+
import (
20+
"fmt"
21+
stdlog "log"
22+
"path"
23+
"path/filepath"
24+
"strconv"
25+
"strings"
26+
27+
"github.com/fluid-cloudnative/fluid/pkg/common"
28+
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
29+
"github.com/fluid-cloudnative/fluid/pkg/utils/docker"
30+
"github.com/fluid-cloudnative/fluid/pkg/webhook/plugins/api"
31+
"github.com/go-logr/logr"
32+
corev1 "k8s.io/api/core/v1"
33+
ctrl "sigs.k8s.io/controller-runtime"
34+
"sigs.k8s.io/controller-runtime/pkg/client"
35+
)
36+
37+
const Name string = "FilePrefetcher"
38+
39+
var defaultFilePrefetcherImage = ""
40+
41+
func init() {
42+
imageRepo := docker.GetImageRepoFromEnv(envKeyFilePrefetcherImage)
43+
imageTag := docker.GetImageTagFromEnv(envKeyFilePrefetcherImage)
44+
if len(imageRepo) == 0 || len(imageTag) == 0 {
45+
stdlog.Printf("WARNING: env variable %s is not set, file prefetcher image is required in Pod's annotation", envKeyFilePrefetcherImage)
46+
return
47+
}
48+
defaultFilePrefetcherImage = fmt.Sprintf("%s:%s", imageRepo, imageTag)
49+
stdlog.Printf("Found %s value %s, using it as defaultFilePrefetcherImage", envKeyFilePrefetcherImage, defaultFilePrefetcherImage)
50+
}
51+
52+
var _ api.MutatingHandler = &FilePrefetcher{}
53+
54+
type FilePrefetcher struct {
55+
client client.Client
56+
name string
57+
log logr.Logger
58+
}
59+
60+
func NewPlugin(c client.Client, args string) (api.MutatingHandler, error) {
61+
return &FilePrefetcher{
62+
client: c,
63+
name: Name,
64+
log: ctrl.Log.WithName("FilePrefetcher"),
65+
}, nil
66+
}
67+
68+
func (p *FilePrefetcher) GetName() string {
69+
return p.name
70+
}
71+
72+
func (p *FilePrefetcher) Mutate(pod *corev1.Pod, runtimeInfos map[string]base.RuntimeInfoInterface) (shouldStop bool, err error) {
73+
if !common.CheckExpectValue(pod.Annotations, AnnotationFilePrefetcherInject, common.True) {
74+
return false, nil
75+
}
76+
77+
if common.CheckExpectValue(pod.Annotations, AnnotationFilePrefetcherInjectDone, common.True) {
78+
return false, nil
79+
}
80+
81+
config, err := p.buildFilePrefetcherConfig(pod, runtimeInfos)
82+
if err != nil {
83+
p.log.Error(err, "failed to build file prefetcher config")
84+
err = fmt.Errorf("failed to build file prefetcher config: %v", err)
85+
return true, err
86+
}
87+
88+
if len(config.GlobPaths) == 0 {
89+
p.log.Info("Skipping injecting file prefetcher sidecar container because there's no valid file-list defined in annotation", "annotation", AnnotationFilePrefetcherFileList)
90+
return false, nil
91+
}
92+
93+
containerSpec, statusFileVolume := p.buildFilePrefetcherSidecarContainer(config)
94+
if config.AsyncPrefetch {
95+
statusVolumeMount := corev1.VolumeMount{
96+
Name: filePrefetcherStatusVolumeName,
97+
MountPath: filePrefetcherStatusVolumeMountPath,
98+
}
99+
for idx, ctr := range pod.Spec.Containers {
100+
if strings.HasPrefix(ctr.Name, common.FuseContainerName) {
101+
// Skip injecting file prefetcher status volume into fluid's fuse sidecar containers.
102+
continue
103+
}
104+
pod.Spec.Containers[idx].VolumeMounts = append(pod.Spec.Containers[idx].VolumeMounts, statusVolumeMount)
105+
}
106+
}
107+
108+
// Inject file prefetcher container right after fuse sidecar containers, we assume fluid's fuse sidecar container is injected together.
109+
// e.g. before injection: [C1, FUSE1, FUSE2, C2, C3], after injection: [C1, FUSE1, FUSE2, FILEPREFETCHER, C2, C3]
110+
pod.Spec.Containers = p.injectFilePrefetcherSidecar(pod.Spec.Containers, containerSpec)
111+
pod.Spec.Volumes = append(pod.Spec.Volumes, statusFileVolume)
112+
pod.Annotations[AnnotationFilePrefetcherInjectDone] = common.True
113+
114+
return false, nil
115+
}
116+
117+
type filePrefetcherConfig struct {
118+
// Image is the image of the file prefetcher sidecar container
119+
Image string
120+
// AsyncPrefetch indicates whether to use async prefetching, defaulting to false
121+
AsyncPrefetch bool
122+
// VolumeMountPaths is a map of volume name to mount path
123+
VolumeMountPaths map[string]string
124+
// GlobPaths is a string indicating all the paths to prefetch. It is a semicolon-separated list of paths
125+
GlobPaths string
126+
// TimeoutSeconds is a int indicating the timeout for file prefetcher, defined in seconds
127+
TimeoutSeconds int
128+
// ExtraEnvs is a map of extra envs to inject into the file prefetcher sidecar container
129+
ExtraEnvs map[string]string
130+
}
131+
132+
func (p *FilePrefetcher) buildFilePrefetcherConfig(pod *corev1.Pod, runtimeInfos map[string]base.RuntimeInfoInterface) (config filePrefetcherConfig, err error) {
133+
defaultFn := func(keyValues map[string]string, key string, defaultValue string) (value string) {
134+
if value, ok := keyValues[key]; ok {
135+
return value
136+
}
137+
return defaultValue
138+
}
139+
140+
config.Image = defaultFn(pod.Annotations, AnnotationFilePrefetcherImage, defaultFilePrefetcherImage)
141+
if len(config.Image) == 0 {
142+
err = fmt.Errorf("file prefetcher's image is required, set it in pod's annotation \"%s=<image>\"", AnnotationFilePrefetcherImage)
143+
return
144+
}
145+
146+
extraEnvs := map[string]string{}
147+
// extraEnvsStr takes the format like: '<key1>=<value1> <key2>=<value2> <key3>=<value3>'
148+
extraEnvsStr := defaultFn(pod.Annotations, AnnotationFilePrefetcherExtraEnvs, "")
149+
if len(extraEnvsStr) > 0 {
150+
keyValuePairs := strings.Split(extraEnvsStr, " ")
151+
for _, keyValuePair := range keyValuePairs {
152+
kvSlice := strings.Split(keyValuePair, "=")
153+
if len(kvSlice) != 2 {
154+
err = fmt.Errorf("file prefetcher's extra envs is required to be '<key1>=<value1> <key2>=<value2> <key3>=<value3>', but found unexpected key-value pair: %s", keyValuePair)
155+
return
156+
}
157+
extraEnvs[kvSlice[0]] = kvSlice[1]
158+
}
159+
}
160+
config.ExtraEnvs = extraEnvs
161+
162+
fileList := defaultFn(pod.Annotations, AnnotationFilePrefetcherFileList, filePrefetcherDefaultFileList)
163+
if fileList == filePrefetcherDefaultFileList {
164+
pvcNames := make([]string, 0)
165+
for pvcName := range runtimeInfos {
166+
pvcNames = append(pvcNames, fmt.Sprintf("pvc://%s", pvcName))
167+
}
168+
fileList = strings.Join(pvcNames, ";")
169+
}
170+
volumeMountPaths, globPaths := p.parseGlobPathsFromFileList(fileList, pod, runtimeInfos)
171+
config.VolumeMountPaths = volumeMountPaths
172+
config.GlobPaths = strings.Join(globPaths, ";")
173+
174+
asyncPrefetchStr := defaultFn(pod.Annotations, AnnotationFilePrefetcherAsync, "false")
175+
if asyncPrefetch, parseErr := strconv.ParseBool(asyncPrefetchStr); parseErr != nil {
176+
err = fmt.Errorf("invalid value for %s: %s, must either be false or true: %v", AnnotationFilePrefetcherAsync, asyncPrefetchStr, parseErr)
177+
return
178+
} else {
179+
config.AsyncPrefetch = asyncPrefetch
180+
}
181+
182+
timeoutSecondsStr := defaultFn(pod.Annotations, AnnotationFilePrefetcherTimeoutSeconds, filePrefetcherDefaultTimeoutSecondsStr)
183+
if timeoutSeconds, parseErr := strconv.ParseInt(timeoutSecondsStr, 10, 32); parseErr != nil {
184+
err = fmt.Errorf("invalid value for %s: %s, must be of type integer: %v", AnnotationFilePrefetcherTimeoutSeconds, timeoutSecondsStr, parseErr)
185+
return
186+
} else {
187+
config.TimeoutSeconds = int(timeoutSeconds)
188+
}
189+
p.log.V(1).Info("building file prefetcher config", "config", config)
190+
191+
return
192+
}
193+
194+
func (p *FilePrefetcher) parseGlobPathsFromFileList(fileList string, pod *corev1.Pod, runtimeInfos map[string]base.RuntimeInfoInterface) (volumeMountPaths map[string]string, globPaths []string) {
195+
volumeMountPaths = map[string]string{}
196+
globPaths = []string{}
197+
198+
if len(fileList) == 0 {
199+
return
200+
}
201+
202+
uriPaths := strings.Split(fileList, ";")
203+
for _, uriPath := range uriPaths {
204+
if !strings.HasPrefix(uriPath, string(common.VolumeScheme)) {
205+
p.log.Info("skip adding path to prefetch list because it does not start with pvc://", "path", uriPath)
206+
continue
207+
}
208+
// e.g. uriPath="pvc://mypvc/path/to/myfolder/*.pkl" => items=["mypvc", "path", "to", "myfolder", "*.pkl"]
209+
items := strings.Split(strings.TrimPrefix(uriPath, string(common.VolumeScheme)), string(filepath.Separator))
210+
if len(items) == 0 {
211+
p.log.Info("skip adding path to prefetch list because it does not specify a valid persistentVolumeClaim", "path", uriPath)
212+
continue
213+
}
214+
215+
var pvcName, globPath string
216+
if len(items) == 1 {
217+
pvcName = items[0]
218+
globPath = "**"
219+
} else {
220+
pvcName = items[0]
221+
globPath = filepath.Clean(fmt.Sprintf("%c%s", filepath.Separator, filepath.Join(items[1:]...)))
222+
}
223+
224+
if _, ok := runtimeInfos[pvcName]; !ok {
225+
p.log.Info("skip adding path to prefetch list because the persistentVolumeClaim is not managed by Fluid", "path", uriPath)
226+
continue
227+
}
228+
229+
for _, volume := range pod.Spec.Volumes {
230+
if volume.PersistentVolumeClaim != nil && volume.PersistentVolumeClaim.ClaimName == pvcName {
231+
volumeMountPaths[volume.Name] = path.Join("/data", volume.Name)
232+
globPaths = append(globPaths, path.Join(volumeMountPaths[volume.Name], globPath))
233+
}
234+
}
235+
}
236+
237+
return
238+
}
239+
240+
// This function assembles and returns a file prefetcher sidecar container
241+
func (p *FilePrefetcher) buildFilePrefetcherSidecarContainer(config filePrefetcherConfig) (corev1.Container, corev1.Volume) {
242+
volumeMounts := []corev1.VolumeMount{}
243+
for volumeName, mountPath := range config.VolumeMountPaths {
244+
volumeMounts = append(volumeMounts, corev1.VolumeMount{
245+
Name: volumeName,
246+
MountPath: mountPath,
247+
})
248+
}
249+
250+
statusFileVolume := corev1.Volume{
251+
Name: filePrefetcherStatusVolumeName,
252+
VolumeSource: corev1.VolumeSource{
253+
EmptyDir: &corev1.EmptyDirVolumeSource{},
254+
},
255+
}
256+
volumeMounts = append(volumeMounts, corev1.VolumeMount{
257+
Name: statusFileVolume.Name,
258+
MountPath: filePrefetcherStatusVolumeMountPath,
259+
})
260+
261+
containerSpec := corev1.Container{
262+
Name: filePrefetcherContainerName,
263+
Image: config.Image,
264+
Env: []corev1.EnvVar{
265+
{
266+
Name: envKeyFilePrefetcherFileList,
267+
Value: config.GlobPaths,
268+
},
269+
{
270+
Name: envKeyFilePrefetcherAsyncPrefetch,
271+
Value: strconv.FormatBool(config.AsyncPrefetch),
272+
},
273+
{
274+
Name: envKeyFilePrefetcherTimeoutSeconds,
275+
Value: strconv.FormatInt(int64(config.TimeoutSeconds), 10),
276+
},
277+
},
278+
VolumeMounts: volumeMounts,
279+
}
280+
281+
for k, v := range config.ExtraEnvs {
282+
containerSpec.Env = append(containerSpec.Env, corev1.EnvVar{Name: k, Value: v})
283+
}
284+
285+
if !config.AsyncPrefetch {
286+
containerSpec.Lifecycle = &corev1.Lifecycle{
287+
PostStart: &corev1.LifecycleHandler{
288+
Exec: &corev1.ExecAction{
289+
Command: []string{
290+
"bash",
291+
"-c",
292+
`cnt=0; while [[ $cnt -lt $FILE_PREFETCHER_TIMEOUT_SECONDS ]]; do if [[ -e "/tmp/fluid-file-prefetcher/status/prefetcher.status" ]]; then exit 0; fi; cnt=$(expr $cnt + 1); sleep 1; done; echo "time out waiting for prefetching done"; exit 1`,
293+
},
294+
},
295+
},
296+
}
297+
}
298+
299+
return containerSpec, statusFileVolume
300+
}
301+
302+
func (p *FilePrefetcher) injectFilePrefetcherSidecar(oldContainers []corev1.Container, filePrefetcherCtr corev1.Container) (newContainers []corev1.Container) {
303+
lastFuseSidecarIndex := -1
304+
for idx, ctr := range oldContainers {
305+
if strings.HasPrefix(ctr.Name, common.FuseContainerName) {
306+
lastFuseSidecarIndex = idx
307+
}
308+
}
309+
310+
// Insert file prefetcher sidecar after
311+
newContainers = make([]corev1.Container, 0, len(oldContainers)+1)
312+
newContainers = append(newContainers, oldContainers[:lastFuseSidecarIndex+1]...)
313+
newContainers = append(newContainers, filePrefetcherCtr)
314+
newContainers = append(newContainers, oldContainers[lastFuseSidecarIndex+1:]...)
315+
316+
return newContainers
317+
}

0 commit comments

Comments
 (0)