From a08b1ef2a5d359a3f9ea47462efe9b38be46eb38 Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Mon, 20 Apr 2026 16:54:21 +1000 Subject: [PATCH 1/2] feat: add tunable log streaming interval to reduce chunk upload API calls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a --log-streaming-interval flag (BUILDKITE_LOG_STREAMING_INTERVAL, default 3s) that controls how long the log streamer accumulates output before uploading a chunk. Larger intervals mean fewer, bigger chunks — reducing API calls, improving gzip compression ratios, and lowering egress costs for agents behind proxies. The first chunk of each job is always uploaded within 1s so the job log has visible output promptly. Set to 0 to restore the original per-poll-tick behaviour. --- agent/agent_configuration.go | 6 ++ agent/job_runner.go | 1 + agent/log_streamer.go | 163 ++++++++++++++++++++++++++--------- clicommand/agent_start.go | 9 ++ 4 files changed, 140 insertions(+), 39 deletions(-) diff --git a/agent/agent_configuration.go b/agent/agent_configuration.go index 3f1220e6a0..430a466779 100644 --- a/agent/agent_configuration.go +++ b/agent/agent_configuration.go @@ -77,4 +77,10 @@ type AgentConfiguration struct { AllowMultipartArtifactUpload bool PingMode string + + // LogStreamingInterval controls how frequently accumulated log output is + // uploaded as a chunk. Larger values reduce API calls and improve + // compression, at the cost of slightly delayed log rendering in the UI. + // When 0 the agent flushes on every poll tick (original behaviour). + LogStreamingInterval time.Duration } diff --git a/agent/job_runner.go b/agent/job_runner.go index e6ee7498c2..602aef5f60 100644 --- a/agent/job_runner.go +++ b/agent/job_runner.go @@ -203,6 +203,7 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient *api.Client, c Concurrency: 3, MaxChunkSizeBytes: r.conf.Job.ChunksMaxSizeBytes, MaxSizeBytes: r.conf.Job.LogMaxSizeBytes, + MaxChunkAge: r.conf.AgentConfiguration.LogStreamingInterval, }, ) diff --git a/agent/log_streamer.go b/agent/log_streamer.go index 07c6fd8806..cc2c6522ef 100644 --- a/agent/log_streamer.go +++ b/agent/log_streamer.go @@ -6,6 +6,7 @@ import ( "fmt" "sync" "sync/atomic" + "time" "github.com/buildkite/agent/v3/api" "github.com/buildkite/agent/v3/logger" @@ -26,6 +27,13 @@ type LogStreamerConfig struct { // The maximum size of each chunk MaxChunkSizeBytes uint64 + // MaxChunkAge controls how frequently accumulated log data is flushed to + // the upload queue. When 0, data is flushed on every Process call + // (original behaviour). When >0, data accumulates between flushes, + // reducing the number of API calls at the cost of a small latency increase. + // The first flush always happens within 1s to keep the UI responsive. + MaxChunkAge time.Duration + // The maximum size of the log MaxSizeBytes uint64 } @@ -66,6 +74,13 @@ type LogStreamer struct { // Have we stopped? stopped bool + + // pending holds data not yet enqueued as a chunk. + // Guarded by processMutex. + pending []byte + + // stopCh is closed by Stop to signal the age flush goroutine to exit. + stopCh chan struct{} } // NewLogStreamer creates a new instance of the log streamer. @@ -79,6 +94,7 @@ func NewLogStreamer( conf: conf, callback: callback, queue: make(chan *api.Chunk, 1024), + stopCh: make(chan struct{}), } } @@ -96,6 +112,10 @@ func (ls *LogStreamer) Start(ctx context.Context) error { ls.workerWG.Go(func() { ls.worker(ctx, i) }) } + if ls.conf.MaxChunkAge > 0 { + ls.workerWG.Go(func() { ls.ageFlushWorker(ctx) }) + } + return nil } @@ -115,48 +135,28 @@ func (ls *LogStreamer) Process(ctx context.Context, output []byte) error { return errStreamerStopped } - for len(output) > 0 { - // Have we exceeded the max size? - // (This check is also performed on the server side.) - if ls.bytes > ls.conf.MaxSizeBytes && !ls.warnedAboutSize { - ls.logger.Warn("The job log has reached %s in size, which has "+ - "exceeded the maximum size (%s). Further logs may be dropped "+ - "by the server, and a future version of the agent will stop "+ - "sending logs at this point.", - humanize.IBytes(ls.bytes), humanize.IBytes(ls.conf.MaxSizeBytes)) - ls.warnedAboutSize = true - // In a future version, this will error out, e.g.: - // return fmt.Errorf("%w (%d > %d)", errLogExceededMaxSize, ls.bytes, ls.conf.MaxSizeBytes) - } - - // The next chunk will be up to MaxChunkSizeBytes in size. - size := ls.conf.MaxChunkSizeBytes - if lenout := uint64(len(output)); size > lenout { - size = lenout - } + // Have we exceeded the max size? + // (This check is also performed on the server side.) + if ls.bytes+uint64(len(ls.pending))+uint64(len(output)) > ls.conf.MaxSizeBytes && !ls.warnedAboutSize { + ls.logger.Warn("The job log has reached %s in size, which has "+ + "exceeded the maximum size (%s). Further logs may be dropped "+ + "by the server, and a future version of the agent will stop "+ + "sending logs at this point.", + humanize.IBytes(ls.bytes), humanize.IBytes(ls.conf.MaxSizeBytes)) + ls.warnedAboutSize = true + // In a future version, this will error out, e.g.: + // return fmt.Errorf("%w (%d > %d)", errLogExceededMaxSize, ls.bytes, ls.conf.MaxSizeBytes) + } - // Take the chunk from the start of output, leave the remainder for the - // next iteration. - ls.order++ - chunk := &api.Chunk{ - Data: output[:size], - Sequence: ls.order, - Offset: ls.bytes, - Size: size, - } - output = output[size:] + ls.pending = append(ls.pending, output...) - // Stream the chunk onto the queue! - select { - case ls.queue <- chunk: - // Streamed! - case <-ctx.Done(): // pack it up - return ctx.Err() - } - ls.bytes += size + // When age-based flushing is active the ageFlushWorker owns the upload + // cadence; just accumulate here. Without it, flush immediately (original + // behaviour). + if ls.conf.MaxChunkAge > 0 { + return nil } - - return nil + return ls.flushAllPending(ctx) } // Stop stops the streamer. @@ -167,6 +167,10 @@ func (ls *LogStreamer) Stop() { return } ls.stopped = true + close(ls.stopCh) + // Flush any pending data before closing the queue. Use a background + // context since the job context may already be cancelled at this point. + _ = ls.flushAllPending(context.Background()) close(ls.queue) ls.processMutex.Unlock() @@ -174,6 +178,87 @@ func (ls *LogStreamer) Stop() { ls.workerWG.Wait() } +// flushAllPending enqueues all pending data as chunks. Must be called with +// processMutex held. +func (ls *LogStreamer) flushAllPending(ctx context.Context) error { + for len(ls.pending) > 0 { + if err := ls.enqueueNextChunk(ctx); err != nil { + return err + } + } + return nil +} + +// enqueueNextChunk takes up to MaxChunkSizeBytes from the front of pending +// and sends it to the upload queue. Must be called with processMutex held. +func (ls *LogStreamer) enqueueNextChunk(ctx context.Context) error { + size := ls.conf.MaxChunkSizeBytes + if pending := uint64(len(ls.pending)); size > pending { + size = pending + } + + data := make([]byte, size) + copy(data, ls.pending[:size]) + ls.pending = ls.pending[size:] + if len(ls.pending) == 0 { + ls.pending = nil // release backing array + } + + ls.order++ + chunk := &api.Chunk{ + Data: data, + Sequence: ls.order, + Offset: ls.bytes, + Size: size, + } + + select { + case ls.queue <- chunk: + case <-ctx.Done(): + return ctx.Err() + } + ls.bytes += size + return nil +} + +// ageFlushWorker flushes pending data on a timer. The first flush uses a short +// 1s delay so the UI is populated quickly, then switches to MaxChunkAge for +// all subsequent flushes. +func (ls *LogStreamer) ageFlushWorker(ctx context.Context) { + flush := func() { + ls.processMutex.Lock() + if !ls.stopped { + _ = ls.flushAllPending(ctx) + } + ls.processMutex.Unlock() + } + + // First flush: upload immediately so the job log has visible output within 1s of starting. + initialDelay := min(time.Second, ls.conf.MaxChunkAge) + select { + case <-time.After(initialDelay): + flush() + case <-ls.stopCh: + return + case <-ctx.Done(): + return + } + + // Subsequent flushes at the configured interval. + ticker := time.NewTicker(ls.conf.MaxChunkAge) + defer ticker.Stop() + for { + select { + case <-ticker.C: + flush() + case <-ls.stopCh: + return + case <-ctx.Done(): + return + } + } +} + // The actual log streamer worker func (ls *LogStreamer) worker(ctx context.Context, id int) { ls.logger.Debug("[LogStreamer/Worker#%d] Worker is starting...", id) diff --git a/clicommand/agent_start.go b/clicommand/agent_start.go index 9b42e39783..6c2870eb9a 100644 --- a/clicommand/agent_start.go +++ b/clicommand/agent_start.go @@ -186,6 +186,8 @@ type AgentStartConfig struct { HealthCheckAddr string `cli:"health-check-addr"` + LogStreamingInterval time.Duration `cli:"log-streaming-interval"` + // Datadog statsd metrics config MetricsDatadog bool `cli:"metrics-datadog"` MetricsDatadogHost string `cli:"metrics-datadog-host"` @@ -566,6 +568,12 @@ var AgentStartCommand = cli.Command{ Usage: "Start an HTTP server on this addr:port that returns whether the agent is healthy, disabled by default", EnvVar: "BUILDKITE_AGENT_HEALTH_CHECK_ADDR", }, + cli.DurationFlag{ + Name: "log-streaming-interval", + Usage: "How long to accumulate log output before uploading a chunk. Larger values reduce API calls and improve compression for agents behind proxies or with high egress costs. The first chunk is always uploaded within 1s regardless of this setting. Set to 0 to disable and upload on every poll tick.", + EnvVar: "BUILDKITE_LOG_STREAMING_INTERVAL", + Value: 3 * time.Second, + }, cli.BoolFlag{ Name: "no-pty", Usage: "Do not run jobs within a pseudo terminal (default: false)", @@ -1098,6 +1106,7 @@ var AgentStartCommand = cli.Command{ KubernetesExec: cfg.KubernetesExec, KubernetesContainerStartTimeout: cfg.KubernetesContainerStartTimeout, PingMode: cfg.PingMode, + LogStreamingInterval: cfg.LogStreamingInterval, SigningJWKSFile: cfg.SigningJWKSFile, SigningJWKSKeyID: cfg.SigningJWKSKeyID, From 716a8c4118264270ec78fc51a0598e5ff2b42ff5 Mon Sep 17 00:00:00 2001 From: Mark Wolfe Date: Mon, 20 Apr 2026 18:36:42 +1000 Subject: [PATCH 2/2] fix: decouple log streamer worker lifetime from job context --- agent/log_streamer.go | 40 +++++++++++++++--- agent/log_streamer_test.go | 83 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+), 6 deletions(-) diff --git a/agent/log_streamer.go b/agent/log_streamer.go index cc2c6522ef..a0b56e1f5f 100644 --- a/agent/log_streamer.go +++ b/agent/log_streamer.go @@ -66,6 +66,30 @@ type LogStreamer struct { // Counts workers that are still running workerWG sync.WaitGroup + // Normally storing a context in a struct is a bad idea. It's explicitly + // called out in pkg.go.dev/context as something to avoid, because it can + // blur ownership and make it unclear which context should govern a given + // unit of work. + // + // Here the ownership boundary is explicit: + // + // - workerCtx applies only to uploader worker lifetime + // - it is independent of the job/run context passed to Start + // - the job/run context controls normal timed flushing + // - Stop controls final draining and worker shutdown + // + // The reason for separating those concerns is that the job context may be + // cancelled before cleanup runs. If uploader workers inherited that context, + // they could exit before Stop flushes pending chunks, which would risk + // dropped logs or deadlock while enqueueing final chunks. + // + // workerCtx therefore keeps uploader workers alive until Stop finishes + // draining the queue, even if the job context has already been cancelled. + workerCtx context.Context + + // workerCancel cancels workerCtx after shutdown has completed. + workerCancel context.CancelFunc + // Only allow processing one at a time processMutex sync.Mutex @@ -89,12 +113,15 @@ func NewLogStreamer( callback func(context.Context, *api.Chunk) error, conf LogStreamerConfig, ) *LogStreamer { + workerCtx, workerCancel := context.WithCancel(context.Background()) return &LogStreamer{ - logger: agentLogger, - conf: conf, - callback: callback, - queue: make(chan *api.Chunk, 1024), - stopCh: make(chan struct{}), + logger: agentLogger, + conf: conf, + callback: callback, + queue: make(chan *api.Chunk, 1024), + stopCh: make(chan struct{}), + workerCtx: workerCtx, + workerCancel: workerCancel, } } @@ -109,7 +136,7 @@ func (ls *LogStreamer) Start(ctx context.Context) error { } for i := range ls.conf.Concurrency { - ls.workerWG.Go(func() { ls.worker(ctx, i) }) + ls.workerWG.Go(func() { ls.worker(ls.workerCtx, i) }) } if ls.conf.MaxChunkAge > 0 { @@ -176,6 +203,7 @@ func (ls *LogStreamer) Stop() { ls.logger.Debug("[LogStreamer] Waiting for workers to shut down") ls.workerWG.Wait() + ls.workerCancel() } // flushAllPending enqueues all pending data as chunks. Must be called with diff --git a/agent/log_streamer_test.go b/agent/log_streamer_test.go index eebb464ffd..445caafb02 100644 --- a/agent/log_streamer_test.go +++ b/agent/log_streamer_test.go @@ -7,6 +7,7 @@ import ( "sort" "sync" "testing" + "time" "github.com/buildkite/agent/v3/api" "github.com/buildkite/agent/v3/logger" @@ -94,3 +95,85 @@ func TestLogStreamer(t *testing.T) { t.Errorf("after Stop: LogStreamer.Process(ctx, %q) err = %v, want %v", input, err, errStreamerStopped) } } + +func TestLogStreamerStopFlushesPendingAfterContextCancellation(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithCancel(context.Background()) + logger := logger.NewConsoleLogger( + logger.NewTextPrinter(os.Stderr), + func(c int) { t.Errorf("exit(%d)", c) }, + ) + + var mu sync.Mutex + var got []*api.Chunk + callback := func(ctx context.Context, chunk *api.Chunk) error { + mu.Lock() + got = append(got, chunk) + mu.Unlock() + return nil + } + + ls := NewLogStreamer(logger, callback, LogStreamerConfig{ + Concurrency: 1, + MaxChunkSizeBytes: 10, + MaxChunkAge: time.Hour, + }) + + if err := ls.Start(ctx); err != nil { + t.Fatalf("LogStreamer.Start(ctx) = %v", err) + } + + input := "hello world" + if err := ls.Process(ctx, []byte(input)); err != nil { + t.Fatalf("LogStreamer.Process(ctx, %q) = %v", input, err) + } + + cancel() + ls.Stop() + + want := []*api.Chunk{ + { + Data: []byte("hello worl"), + Sequence: 1, + Offset: 0, + Size: 10, + }, + { + Data: []byte("d"), + Sequence: 2, + Offset: 10, + Size: 1, + }, + } + sort.Slice(got, func(i, j int) bool { + return got[i].Sequence < got[j].Sequence + }) + if diff := cmp.Diff(got, want); diff != "" { + t.Fatalf("LogStreamer chunks diff after cancellation (-got +want):\n%s", diff) + } +} + +func TestLogStreamerStopIsIdempotent(t *testing.T) { + t.Parallel() + + ctx := context.Background() + logger := logger.NewConsoleLogger( + logger.NewTextPrinter(os.Stderr), + func(c int) { t.Errorf("exit(%d)", c) }, + ) + + ls := NewLogStreamer(logger, func(ctx context.Context, chunk *api.Chunk) error { + return nil + }, LogStreamerConfig{ + Concurrency: 1, + MaxChunkSizeBytes: 10, + }) + + if err := ls.Start(ctx); err != nil { + t.Fatalf("LogStreamer.Start(ctx) = %v", err) + } + + ls.Stop() + ls.Stop() +}