Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions flytecopilot/data/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package data

import (
"context"
"time"

"github.com/flyteorg/flyte/flytestdlib/logger"
)

const (
uploadFileRetryMaxAttemptIndex = 5
uploadFileRetryDelay = 2 * time.Second
)

// See flyteadmin/pkg/async.RetryOnSpecificErrors
func retryOnSpecificErrors(ctx context.Context, attempts int, delay time.Duration, f func() error, isErrorRetryable func(error) bool) error {
var err error
for attempt := 0; attempt <= attempts; attempt++ {
err = f()
if err == nil {
return nil
}
if !isErrorRetryable(err) {
return err
}
if attempt == attempts {
return err
}
logger.Warningf(ctx, "Failed [%v] on attempt %d of %d; retrying after %v", err, attempt, attempts, delay)
select {
case <-time.After(delay):
case <-ctx.Done():
return ctx.Err()
}
}
return err
}

func retryOnAllErrors(err error) bool {
return true
}
41 changes: 41 additions & 0 deletions flytecopilot/data/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package data

import (
"context"
"testing"
"time"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

var s3LikeError = errors.Wrap(errors.New(
`Put "https://example.s3.us-west-2.amazonaws.com/key?partNumber=11&uploadId=x": http: server closed idle connection`),
`PutObject, putting object: MultipartUpload: upload multipart failed
caused by: RequestError: send request failed
caused by: Put "https://example.s3.us-west-2.amazonaws.com/key?partNumber=11&uploadId=x"`)

func TestRetryOnSpecificErrors_SucceedsFirstTry(t *testing.T) {
ctx := context.Background()
calls := 0
err := retryOnSpecificErrors(ctx, 3, time.Millisecond, func() error {
calls++
return nil
}, retryOnAllErrors)
assert.NoError(t, err)
assert.Equal(t, 1, calls)
}

func TestRetryOnSpecificErrors_RetriesThenSucceeds(t *testing.T) {
ctx := context.Background()
calls := 0
err := retryOnSpecificErrors(ctx, 5, time.Millisecond, func() error {
calls++
if calls < 3 {
return s3LikeError
}
return nil
}, retryOnAllErrors)
assert.NoError(t, err)
assert.Equal(t, 3, calls)
}
19 changes: 10 additions & 9 deletions flytecopilot/data/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,18 @@ func IsFileReadable(fpath string, ignoreExtension bool) (string, os.FileInfo, er

// Uploads a file to the data store.
func UploadFileToStorage(ctx context.Context, filePath string, toPath storage.DataReference, size int64, store *storage.DataStore) error {
f, err := os.Open(filePath)
if err != nil {
return err
}
defer func() {
err := f.Close()
return retryOnSpecificErrors(ctx, uploadFileRetryMaxAttemptIndex, uploadFileRetryDelay, func() error {
f, err := os.Open(filePath)
if err != nil {
logger.Errorf(ctx, "failed to close blob file at path [%s]", filePath)
return err
}
}()
return store.WriteRaw(ctx, toPath, size, storage.Options{}, f)
defer func() {
if cerr := f.Close(); cerr != nil {
logger.Errorf(ctx, "failed to close blob file at path [%s]", filePath)
}
}()
return store.WriteRaw(ctx, toPath, size, storage.Options{}, f)
}, retryOnAllErrors)
}

func DownloadFileFromStorage(ctx context.Context, ref storage.DataReference, store *storage.DataStore) (io.ReadCloser, error) {
Expand Down
Loading