diff --git a/agent/BUILD.bazel b/agent/BUILD.bazel index 767decc001..e0e85be3a2 100644 --- a/agent/BUILD.bazel +++ b/agent/BUILD.bazel @@ -66,6 +66,7 @@ go_library( go_test( name = "agent_test", srcs = [ + "artifact_batch_hints_test.go", "agent_worker_test.go", "fake_api_server_test.go", "gcp_meta_data_test.go", diff --git a/agent/agent_worker.go b/agent/agent_worker.go index 5625351056..c6ff4a5b1f 100644 --- a/agent/agent_worker.go +++ b/agent/agent_worker.go @@ -116,10 +116,20 @@ type AgentWorker struct { // The time when this agent worker started startTime time.Time + // Server-provided hints for artifact upload batching, updated from ping. + artifactBatchHintsMu sync.RWMutex + artifactCreateBatchSize int + artifactUpdateBatchSizeMax int + // disable the delay between pings, to speed up certain testing scenarios noWaitBetweenPingsForTesting bool } +const ( + artifactCreateBatchSizeEnv = "BUILDKITE_ARTIFACT_CREATE_BATCH_SIZE" + artifactUpdateBatchSizeMaxEnv = "BUILDKITE_ARTIFACT_UPDATE_BATCH_SIZE_MAX" +) + type agentWorkerState string const ( @@ -505,6 +515,8 @@ func (a *AgentWorker) AcquireAndRunJob(ctx context.Context, jobId string) error } func (a *AgentWorker) RunJob(ctx context.Context, acceptResponse *api.Job, ignoreAgentInDispatches *bool) error { + a.applyArtifactBatchHintsToJob(acceptResponse) + a.setBusy(acceptResponse.ID) defer a.setIdle() @@ -549,6 +561,47 @@ func (a *AgentWorker) RunJob(ctx context.Context, acceptResponse *api.Job, ignor return nil } +func (a *AgentWorker) setArtifactBatchHintsFromPing(ping *api.Ping) { + if ping == nil { + return + } + + a.artifactBatchHintsMu.Lock() + defer a.artifactBatchHintsMu.Unlock() + + if ping.ArtifactCreateBatchSize > 0 { + a.artifactCreateBatchSize = ping.ArtifactCreateBatchSize + } + if ping.ArtifactUpdateBatchSizeMax > 0 { + a.artifactUpdateBatchSizeMax = ping.ArtifactUpdateBatchSizeMax + } +} + +func (a *AgentWorker) artifactBatchHints() (createBatchSize, updateBatchSizeMax int) { + a.artifactBatchHintsMu.RLock() + defer a.artifactBatchHintsMu.RUnlock() + + return a.artifactCreateBatchSize, a.artifactUpdateBatchSizeMax +} + +func (a *AgentWorker) applyArtifactBatchHintsToJob(job *api.Job) { + createBatchSize, updateBatchSizeMax := a.artifactBatchHints() + if createBatchSize <= 0 && updateBatchSizeMax <= 0 { + return + } + + if job.Env == nil { + job.Env = make(map[string]string) + } + + if createBatchSize > 0 { + job.Env[artifactCreateBatchSizeEnv] = fmt.Sprint(createBatchSize) + } + if updateBatchSizeMax > 0 { + job.Env[artifactUpdateBatchSizeMaxEnv] = fmt.Sprint(updateBatchSizeMax) + } +} + // Disconnect notifies the Buildkite API that this agent worker/session is // permanently disconnecting. Don't spend long retrying, because we want to // disconnect as fast as possible. diff --git a/agent/agent_worker_ping.go b/agent/agent_worker_ping.go index 7e4957ad03..cf0bb1aac2 100644 --- a/agent/agent_worker_ping.go +++ b/agent/agent_worker_ping.go @@ -210,6 +210,8 @@ func (a *AgentWorker) Ping(ctx context.Context) (jobID, action string, err error // once we've done that, we can do the error handling for pingErr if ping != nil { + a.setArtifactBatchHintsFromPing(ping) + // Is there a message that should be shown in the logs? if ping.Message != "" { a.logger.Info(ping.Message) @@ -258,6 +260,7 @@ func (a *AgentWorker) Ping(ctx context.Context) (jobID, action string, err error a.apiClient = newAPIClient a.agent.Endpoint = ping.Endpoint ping = newPing + a.setArtifactBatchHintsFromPing(ping) } } diff --git a/agent/artifact_batch_hints_test.go b/agent/artifact_batch_hints_test.go new file mode 100644 index 0000000000..cb00c9bb33 --- /dev/null +++ b/agent/artifact_batch_hints_test.go @@ -0,0 +1,39 @@ +package agent + +import ( + "testing" + + "github.com/buildkite/agent/v3/api" + "github.com/stretchr/testify/require" +) + +func TestApplyArtifactBatchHintsToJobSetsEnv(t *testing.T) { + t.Parallel() + + worker := &AgentWorker{} + worker.setArtifactBatchHintsFromPing(&api.Ping{ + ArtifactCreateBatchSize: 60, + ArtifactUpdateBatchSizeMax: 240, + }) + + job := &api.Job{} + worker.applyArtifactBatchHintsToJob(job) + + require.Equal(t, "60", job.Env[artifactCreateBatchSizeEnv]) + require.Equal(t, "240", job.Env[artifactUpdateBatchSizeMaxEnv]) +} + +func TestSetArtifactBatchHintsFromPingIgnoresZeroValues(t *testing.T) { + t.Parallel() + + worker := &AgentWorker{} + worker.setArtifactBatchHintsFromPing(&api.Ping{ + ArtifactCreateBatchSize: 45, + ArtifactUpdateBatchSizeMax: 180, + }) + worker.setArtifactBatchHintsFromPing(&api.Ping{}) + + createBatchSize, updateBatchSizeMax := worker.artifactBatchHints() + require.Equal(t, 45, createBatchSize) + require.Equal(t, 180, updateBatchSizeMax) +} diff --git a/api/pings.go b/api/pings.go index 6ba67354ab..aeb01dd1fe 100644 --- a/api/pings.go +++ b/api/pings.go @@ -4,11 +4,13 @@ import "context" // Ping represents a Buildkite Agent API Ping type Ping struct { - Action string `json:"action,omitempty"` - Message string `json:"message,omitempty"` - Job *Job `json:"job,omitempty"` - Endpoint string `json:"endpoint,omitempty"` - RequestHeaders map[string]string `json:"request_headers,omitzero"` // omit nil, keep empty map + Action string `json:"action,omitempty"` + Message string `json:"message,omitempty"` + Job *Job `json:"job,omitempty"` + Endpoint string `json:"endpoint,omitempty"` + RequestHeaders map[string]string `json:"request_headers,omitzero"` // omit nil, keep empty map + ArtifactCreateBatchSize int `json:"artifact_create_batch_size,omitempty"` + ArtifactUpdateBatchSizeMax int `json:"artifact_update_batch_size_max,omitempty"` } // Pings the API and returns any work the client needs to perform diff --git a/clicommand/artifact_upload.go b/clicommand/artifact_upload.go index f543ac4eb6..3f71d66d77 100644 --- a/clicommand/artifact_upload.go +++ b/clicommand/artifact_upload.go @@ -82,6 +82,8 @@ type ArtifactUploadConfig struct { GlobResolveFollowSymlinks bool `cli:"glob-resolve-follow-symlinks"` UploadSkipSymlinks bool `cli:"upload-skip-symlinks"` NoMultipartUpload bool `cli:"no-multipart-artifact-upload"` + CreateBatchSize int `cli:"artifact-create-batch-size"` + UpdateBatchSizeMax int `cli:"artifact-update-batch-size-max"` // deprecated FollowSymlinks bool `cli:"follow-symlinks" deprecated-and-renamed-to:"GlobResolveFollowSymlinks"` @@ -125,6 +127,16 @@ var ArtifactUploadCommand = cli.Command{ Usage: "After the glob has been resolved to a list of files to upload, skip uploading those that are symlinks to files (default: false)", EnvVar: "BUILDKITE_ARTIFACT_UPLOAD_SKIP_SYMLINKS", }, + cli.IntFlag{ + Name: "artifact-create-batch-size", + Usage: "Maximum number of artifacts to include in each create-artifacts API request (default: 30)", + EnvVar: "BUILDKITE_ARTIFACT_CREATE_BATCH_SIZE", + }, + cli.IntFlag{ + Name: "artifact-update-batch-size-max", + Usage: "Maximum number of artifact states to include in each update-artifacts API request (default: unlimited)", + EnvVar: "BUILDKITE_ARTIFACT_UPDATE_BATCH_SIZE_MAX", + }, cli.BoolFlag{ // Deprecated Name: "follow-symlinks", Usage: "Follow symbolic links while resolving globs. Note this argument is deprecated. Use `--glob-resolve-follow-symlinks` instead (default: false)", @@ -157,6 +169,8 @@ var ArtifactUploadCommand = cli.Command{ // this works as long as the user only sets one of the two flags GlobResolveFollowSymlinks: (cfg.GlobResolveFollowSymlinks || cfg.FollowSymlinks), UploadSkipSymlinks: cfg.UploadSkipSymlinks, + CreateBatchSize: cfg.CreateBatchSize, + UpdateBatchSizeMax: cfg.UpdateBatchSizeMax, }) // Upload the artifacts diff --git a/internal/artifact/BUILD.bazel b/internal/artifact/BUILD.bazel index 46bae8c7d8..480a134f75 100644 --- a/internal/artifact/BUILD.bazel +++ b/internal/artifact/BUILD.bazel @@ -93,6 +93,7 @@ go_test( "artifactory_downloader_test.go", "artifactory_uploader_test.go", "azure_blob_test.go", + "batching_test.go", "bk_uploader_test.go", "download_test.go", "downloader_test.go", @@ -113,5 +114,6 @@ go_test( "@com_github_google_go_cmp//cmp", "@com_github_google_go_cmp//cmp/cmpopts", "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", ], ) diff --git a/internal/artifact/batch_creator.go b/internal/artifact/batch_creator.go index 208043f3d7..0b661dbf9f 100644 --- a/internal/artifact/batch_creator.go +++ b/internal/artifact/batch_creator.go @@ -25,6 +25,10 @@ type BatchCreatorConfig struct { // Whether to allow multipart uploads to the BK-hosted bucket. AllowMultipart bool + + // Number of artifacts in each CreateArtifacts request. + // If zero, a default is used. + CreateBatchSize int } type BatchCreator struct { @@ -48,12 +52,16 @@ func NewArtifactBatchCreator(l logger.Logger, ac APIClient, c BatchCreatorConfig func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) { length := len(a.conf.Artifacts) - chunks := 30 + batchSize := a.conf.CreateBatchSize + if batchSize <= 0 { + batchSize = 30 + } + const maxCreateBatchSize = 500 // Split into the artifacts into chunks so we're not uploading a ton of // files at once. - for i := 0; i < length; i += chunks { - j := min(i+chunks, length) + for i := 0; i < length; { + j := min(i+batchSize, length) // The artifacts that will be uploaded in this chunk theseArtifacts := a.conf.Artifacts[i:j] @@ -72,6 +80,7 @@ func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) { a.logger.Info("Creating (%d-%d)/%d artifacts", i, j, length) timeout := a.conf.CreateArtifactsTimeout + saw429 := false // Retry the batch upload a couple of times r := roko.NewRetrier( @@ -87,6 +96,9 @@ func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) { } creation, resp, err := a.apiClient.CreateArtifacts(ctxTimeout, a.conf.JobID, batch) + if resp != nil && resp.StatusCode == 429 { + saw429 = true + } // the server returns a 403 code if the artifact has exceeded the service quota // Break the retry on any 4xx code except for 429 Too Many Requests. if resp != nil && (resp.StatusCode != 429 && resp.StatusCode >= 400 && resp.StatusCode <= 499) { @@ -121,6 +133,16 @@ func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) { theseArtifacts[index].UploadInstructions = specific } } + + if saw429 && batchSize < maxCreateBatchSize { + newBatchSize := min(batchSize*2, maxCreateBatchSize) + if newBatchSize != batchSize { + a.logger.Info("Received 429 while creating artifacts, increasing create batch size from %d to %d", batchSize, newBatchSize) + batchSize = newBatchSize + } + } + + i = j } return a.conf.Artifacts, nil diff --git a/internal/artifact/batching_test.go b/internal/artifact/batching_test.go new file mode 100644 index 0000000000..f8cf8d88a6 --- /dev/null +++ b/internal/artifact/batching_test.go @@ -0,0 +1,119 @@ +package artifact + +import ( + "context" + "errors" + "fmt" + "net/http" + "testing" + + "github.com/buildkite/agent/v3/api" + "github.com/buildkite/agent/v3/logger" + "github.com/stretchr/testify/require" +) + +type stubArtifactAPIClient struct { + createFn func(context.Context, string, *api.ArtifactBatch) (*api.ArtifactBatchCreateResponse, *api.Response, error) + updateFn func(context.Context, string, []api.ArtifactState) (*api.Response, error) +} + +func (s *stubArtifactAPIClient) CreateArtifacts(ctx context.Context, jobID string, batch *api.ArtifactBatch) (*api.ArtifactBatchCreateResponse, *api.Response, error) { + if s.createFn == nil { + panic("unexpected CreateArtifacts call") + } + return s.createFn(ctx, jobID, batch) +} + +func (s *stubArtifactAPIClient) SearchArtifacts(context.Context, string, *api.ArtifactSearchOptions) ([]*api.Artifact, *api.Response, error) { + return nil, nil, nil +} + +func (s *stubArtifactAPIClient) UpdateArtifacts(ctx context.Context, jobID string, states []api.ArtifactState) (*api.Response, error) { + if s.updateFn == nil { + panic("unexpected UpdateArtifacts call") + } + return s.updateFn(ctx, jobID, states) +} + +func TestBatchCreatorIncreasesCreateBatchSizeAfter429(t *testing.T) { + t.Parallel() + + artifacts := make([]*api.Artifact, 12) + for i := range artifacts { + artifacts[i] = &api.Artifact{Path: fmt.Sprintf("artifact-%d", i)} + } + + var call int + var batchSizes []int + apiClient := &stubArtifactAPIClient{ + createFn: func(_ context.Context, _ string, batch *api.ArtifactBatch) (*api.ArtifactBatchCreateResponse, *api.Response, error) { + batchSizes = append(batchSizes, len(batch.Artifacts)) + + if call == 0 { + call++ + return nil, &api.Response{Response: &http.Response{StatusCode: http.StatusTooManyRequests, Status: "429 Too Many Requests"}}, errors.New("rate limited") + } + + ids := make([]string, len(batch.Artifacts)) + for i := range ids { + ids[i] = fmt.Sprintf("id-%d-%d", call, i) + } + call++ + + return &api.ArtifactBatchCreateResponse{ + ArtifactIDs: ids, + InstructionsTemplate: &api.ArtifactUploadInstructions{}, + }, &api.Response{Response: &http.Response{StatusCode: http.StatusCreated, Status: "201 Created"}}, nil + }, + } + + creator := NewArtifactBatchCreator(logger.Discard, apiClient, BatchCreatorConfig{ + JobID: "job-id", + Artifacts: artifacts, + CreateBatchSize: 5, + }) + + _, err := creator.Create(t.Context()) + require.NoError(t, err) + require.Equal(t, []int{5, 5, 7}, batchSizes) +} + +func TestUpdateStatesRespectsConfiguredBatchMax(t *testing.T) { + t.Parallel() + + var updateSizes []int + apiClient := &stubArtifactAPIClient{ + updateFn: func(_ context.Context, _ string, states []api.ArtifactState) (*api.Response, error) { + updateSizes = append(updateSizes, len(states)) + return &api.Response{Response: &http.Response{StatusCode: http.StatusOK, Status: "200 OK"}}, nil + }, + } + + u := &Uploader{ + conf: UploaderConfig{ + JobID: "job-id", + UpdateBatchSizeMax: 2, + }, + logger: logger.Discard, + apiClient: apiClient, + } + + worker := &artifactUploadWorker{ + Uploader: u, + trackers: map[*api.Artifact]*artifactTracker{}, + } + for i := 0; i < 5; i++ { + artifact := &api.Artifact{ID: fmt.Sprintf("artifact-%d", i)} + worker.trackers[artifact] = &artifactTracker{ + ArtifactState: api.ArtifactState{ID: artifact.ID, State: "finished"}, + } + } + + err := worker.updateStates(t.Context()) + require.NoError(t, err) + require.Equal(t, []int{2, 2, 1}, updateSizes) + + for _, tracker := range worker.trackers { + require.Equal(t, "sent", tracker.State) + } +} diff --git a/internal/artifact/uploader.go b/internal/artifact/uploader.go index 484bdcf9f1..09844ac407 100644 --- a/internal/artifact/uploader.go +++ b/internal/artifact/uploader.go @@ -61,6 +61,13 @@ type UploaderConfig struct { // Whether to allow multipart uploads to the BK-hosted bucket AllowMultipart bool + + // Maximum artifacts in each CreateArtifacts request. If zero, uses the default. + CreateBatchSize int + + // Maximum artifact states in each UpdateArtifacts request. If zero, all pending + // states are sent in one request. + UpdateBatchSizeMax int } type Uploader struct { @@ -114,6 +121,7 @@ func (a *Uploader) Upload(ctx context.Context) error { UploadDestination: a.conf.Destination, CreateArtifactsTimeout: 10 * time.Second, AllowMultipart: a.conf.AllowMultipart, + CreateBatchSize: a.conf.CreateBatchSize, }) artifacts, err = batchCreator.Create(ctx) if err != nil { @@ -742,40 +750,49 @@ func (a *artifactUploadWorker) updateStates(ctx context.Context) error { }) } - // Post the update - timeout := 5 * time.Second + batchSize := a.conf.UpdateBatchSizeMax + if batchSize <= 0 || batchSize > len(statesToUpload) { + batchSize = len(statesToUpload) + } - // Update the states of the artifacts in bulk. - err := roko.NewRetrier( - roko.WithMaxAttempts(10), - roko.WithStrategy(roko.ExponentialSubsecond(500*time.Millisecond)), - ).DoWithContext(ctx, func(r *roko.Retrier) error { - ctxTimeout := ctx - if timeout != 0 { - var cancel func() - ctxTimeout, cancel = context.WithTimeout(ctx, timeout) - defer cancel() - } + for i := 0; i < len(statesToUpload); i += batchSize { + j := min(i+batchSize, len(statesToUpload)) + chunk := statesToUpload[i:j] - _, err := a.apiClient.UpdateArtifacts(ctxTimeout, a.conf.JobID, statesToUpload) - if err != nil { - a.logger.Warn("%s (%s)", err, r) - } + // Post the update + timeout := 5 * time.Second - // after four attempts (0, 1, 2, 3)... - if r.AttemptCount() == 3 { - // The short timeout has given us fast feedback on the first couple of attempts, - // but perhaps the server needs more time to complete the request, so fall back to - // the default HTTP client timeout. - a.logger.Debug("UpdateArtifacts timeout (%s) removed for subsequent attempts", timeout) - timeout = 0 - } + err := roko.NewRetrier( + roko.WithMaxAttempts(10), + roko.WithStrategy(roko.ExponentialSubsecond(500*time.Millisecond)), + ).DoWithContext(ctx, func(r *roko.Retrier) error { + ctxTimeout := ctx + if timeout != 0 { + var cancel func() + ctxTimeout, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } - return err - }) - if err != nil { - a.logger.Error("Error updating artifact states: %v", err) - return err + _, err := a.apiClient.UpdateArtifacts(ctxTimeout, a.conf.JobID, chunk) + if err != nil { + a.logger.Warn("%s (%s)", err, r) + } + + // after four attempts (0, 1, 2, 3)... + if r.AttemptCount() == 3 { + // The short timeout has given us fast feedback on the first couple of attempts, + // but perhaps the server needs more time to complete the request, so fall back to + // the default HTTP client timeout. + a.logger.Debug("UpdateArtifacts timeout (%s) removed for subsequent attempts", timeout) + timeout = 0 + } + + return err + }) + if err != nil { + a.logger.Error("Error updating artifact states: %v", err) + return err + } } for _, tracker := range trackersToMarkSent {