diff --git a/flytecopilot/data/retry.go b/flytecopilot/data/retry.go new file mode 100644 index 00000000000..a8e1e9109f3 --- /dev/null +++ b/flytecopilot/data/retry.go @@ -0,0 +1,41 @@ +package data + +import ( + "context" + "time" + + "github.com/flyteorg/flyte/flytestdlib/logger" +) + +const ( + uploadFileRetryMaxAttemptIndex = 5 + uploadFileRetryDelay = 2 * time.Second +) + +// See flyteadmin/pkg/async.RetryOnSpecificErrors +func retryOnSpecificErrors(ctx context.Context, attempts int, delay time.Duration, f func() error, isErrorRetryable func(error) bool) error { + var err error + for attempt := 0; attempt <= attempts; attempt++ { + err = f() + if err == nil { + return nil + } + if !isErrorRetryable(err) { + return err + } + if attempt == attempts { + return err + } + logger.Warningf(ctx, "Failed [%v] on attempt %d of %d; retrying after %v", err, attempt, attempts, delay) + select { + case <-time.After(delay): + case <-ctx.Done(): + return ctx.Err() + } + } + return err +} + +func retryOnAllErrors(err error) bool { + return true +} diff --git a/flytecopilot/data/retry_test.go b/flytecopilot/data/retry_test.go new file mode 100644 index 00000000000..9be78ad0364 --- /dev/null +++ b/flytecopilot/data/retry_test.go @@ -0,0 +1,41 @@ +package data + +import ( + "context" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +var s3LikeError = errors.Wrap(errors.New( + `Put "https://example.s3.us-west-2.amazonaws.com/key?partNumber=11&uploadId=x": http: server closed idle connection`), + `PutObject, putting object: MultipartUpload: upload multipart failed +caused by: RequestError: send request failed +caused by: Put "https://example.s3.us-west-2.amazonaws.com/key?partNumber=11&uploadId=x"`) + +func TestRetryOnSpecificErrors_SucceedsFirstTry(t *testing.T) { + ctx := context.Background() + calls := 0 + err := retryOnSpecificErrors(ctx, 3, time.Millisecond, func() error { + calls++ + return nil + }, retryOnAllErrors) + assert.NoError(t, err) + assert.Equal(t, 1, calls) +} + +func TestRetryOnSpecificErrors_RetriesThenSucceeds(t *testing.T) { + ctx := context.Background() + calls := 0 + err := retryOnSpecificErrors(ctx, 5, time.Millisecond, func() error { + calls++ + if calls < 3 { + return s3LikeError + } + return nil + }, retryOnAllErrors) + assert.NoError(t, err) + assert.Equal(t, 3, calls) +} diff --git a/flytecopilot/data/utils.go b/flytecopilot/data/utils.go index 923c75f908b..0b229118a2e 100644 --- a/flytecopilot/data/utils.go +++ b/flytecopilot/data/utils.go @@ -46,17 +46,18 @@ func IsFileReadable(fpath string, ignoreExtension bool) (string, os.FileInfo, er // Uploads a file to the data store. func UploadFileToStorage(ctx context.Context, filePath string, toPath storage.DataReference, size int64, store *storage.DataStore) error { - f, err := os.Open(filePath) - if err != nil { - return err - } - defer func() { - err := f.Close() + return retryOnSpecificErrors(ctx, uploadFileRetryMaxAttemptIndex, uploadFileRetryDelay, func() error { + f, err := os.Open(filePath) if err != nil { - logger.Errorf(ctx, "failed to close blob file at path [%s]", filePath) + return err } - }() - return store.WriteRaw(ctx, toPath, size, storage.Options{}, f) + defer func() { + if cerr := f.Close(); cerr != nil { + logger.Errorf(ctx, "failed to close blob file at path [%s]", filePath) + } + }() + return store.WriteRaw(ctx, toPath, size, storage.Options{}, f) + }, retryOnAllErrors) } func DownloadFileFromStorage(ctx context.Context, ref storage.DataReference, store *storage.DataStore) (io.ReadCloser, error) {