Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions agent/agent_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,10 @@ type AgentConfiguration struct {
AllowMultipartArtifactUpload bool

PingMode string

// LogStreamingInterval controls how frequently accumulated log output is
// uploaded as a chunk. Larger values reduce API calls and improve
// compression, at the cost of slightly delayed log rendering in the UI.
// When 0 the agent flushes on every poll tick (original behaviour).
LogStreamingInterval time.Duration
}
1 change: 1 addition & 0 deletions agent/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ func NewJobRunner(ctx context.Context, l logger.Logger, apiClient *api.Client, c
Concurrency: 3,
MaxChunkSizeBytes: r.conf.Job.ChunksMaxSizeBytes,
MaxSizeBytes: r.conf.Job.LogMaxSizeBytes,
MaxChunkAge: r.conf.AgentConfiguration.LogStreamingInterval,
},
)

Expand Down
201 changes: 157 additions & 44 deletions agent/log_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/buildkite/agent/v3/api"
"github.com/buildkite/agent/v3/logger"
Expand All @@ -26,6 +27,13 @@ type LogStreamerConfig struct {
// The maximum size of each chunk
MaxChunkSizeBytes uint64

// MaxChunkAge controls how frequently accumulated log data is flushed to
// the upload queue. When 0, data is flushed on every Process call
// (original behaviour). When >0, data accumulates between flushes,
// reducing the number of API calls at the cost of a small latency increase.
// The first flush always happens within 1s to keep the UI responsive.
MaxChunkAge time.Duration

// The maximum size of the log
MaxSizeBytes uint64
}
Expand Down Expand Up @@ -58,6 +66,30 @@ type LogStreamer struct {
// Counts workers that are still running
workerWG sync.WaitGroup

// Normally storing a context in a struct is a bad idea. It's explicitly
// called out in pkg.go.dev/context as something to avoid, because it can
// blur ownership and make it unclear which context should govern a given
// unit of work.
//
// Here the ownership boundary is explicit:
//
// - workerCtx applies only to uploader worker lifetime
// - it is independent of the job/run context passed to Start
// - the job/run context controls normal timed flushing
// - Stop controls final draining and worker shutdown
//
// The reason for separating those concerns is that the job context may be
// cancelled before cleanup runs. If uploader workers inherited that context,
// they could exit before Stop flushes pending chunks, which would risk
// dropped logs or deadlock while enqueueing final chunks.
//
// workerCtx therefore keeps uploader workers alive until Stop finishes
// draining the queue, even if the job context has already been cancelled.
workerCtx context.Context

// workerCancel cancels workerCtx after shutdown has completed.
workerCancel context.CancelFunc

// Only allow processing one at a time
processMutex sync.Mutex

Expand All @@ -66,6 +98,13 @@ type LogStreamer struct {

// Have we stopped?
stopped bool

// pending holds data not yet enqueued as a chunk.
// Guarded by processMutex.
pending []byte

// stopCh is closed by Stop to signal the age flush goroutine to exit.
stopCh chan struct{}
}

// NewLogStreamer creates a new instance of the log streamer.
Expand All @@ -74,11 +113,15 @@ func NewLogStreamer(
callback func(context.Context, *api.Chunk) error,
conf LogStreamerConfig,
) *LogStreamer {
workerCtx, workerCancel := context.WithCancel(context.Background())
return &LogStreamer{
logger: agentLogger,
conf: conf,
callback: callback,
queue: make(chan *api.Chunk, 1024),
logger: agentLogger,
conf: conf,
callback: callback,
queue: make(chan *api.Chunk, 1024),
stopCh: make(chan struct{}),
workerCtx: workerCtx,
workerCancel: workerCancel,
}
}

Expand All @@ -93,7 +136,11 @@ func (ls *LogStreamer) Start(ctx context.Context) error {
}

for i := range ls.conf.Concurrency {
ls.workerWG.Go(func() { ls.worker(ctx, i) })
ls.workerWG.Go(func() { ls.worker(ls.workerCtx, i) })
}

if ls.conf.MaxChunkAge > 0 {
ls.workerWG.Go(func() { ls.ageFlushWorker(ctx) })
}

return nil
Expand All @@ -115,48 +162,28 @@ func (ls *LogStreamer) Process(ctx context.Context, output []byte) error {
return errStreamerStopped
}

for len(output) > 0 {
// Have we exceeded the max size?
// (This check is also performed on the server side.)
if ls.bytes > ls.conf.MaxSizeBytes && !ls.warnedAboutSize {
ls.logger.Warn("The job log has reached %s in size, which has "+
"exceeded the maximum size (%s). Further logs may be dropped "+
"by the server, and a future version of the agent will stop "+
"sending logs at this point.",
humanize.IBytes(ls.bytes), humanize.IBytes(ls.conf.MaxSizeBytes))
ls.warnedAboutSize = true
// In a future version, this will error out, e.g.:
// return fmt.Errorf("%w (%d > %d)", errLogExceededMaxSize, ls.bytes, ls.conf.MaxSizeBytes)
}

// The next chunk will be up to MaxChunkSizeBytes in size.
size := ls.conf.MaxChunkSizeBytes
if lenout := uint64(len(output)); size > lenout {
size = lenout
}
// Have we exceeded the max size?
// (This check is also performed on the server side.)
if ls.bytes+uint64(len(ls.pending))+uint64(len(output)) > ls.conf.MaxSizeBytes && !ls.warnedAboutSize {
ls.logger.Warn("The job log has reached %s in size, which has "+
"exceeded the maximum size (%s). Further logs may be dropped "+
"by the server, and a future version of the agent will stop "+
"sending logs at this point.",
humanize.IBytes(ls.bytes), humanize.IBytes(ls.conf.MaxSizeBytes))
ls.warnedAboutSize = true
// In a future version, this will error out, e.g.:
// return fmt.Errorf("%w (%d > %d)", errLogExceededMaxSize, ls.bytes, ls.conf.MaxSizeBytes)
}

// Take the chunk from the start of output, leave the remainder for the
// next iteration.
ls.order++
chunk := &api.Chunk{
Data: output[:size],
Sequence: ls.order,
Offset: ls.bytes,
Size: size,
}
output = output[size:]
ls.pending = append(ls.pending, output...)

// Stream the chunk onto the queue!
select {
case ls.queue <- chunk:
// Streamed!
case <-ctx.Done(): // pack it up
return ctx.Err()
}
ls.bytes += size
// When age-based flushing is active the ageFlushWorker owns the upload
// cadence; just accumulate here. Without it, flush immediately (original
// behaviour).
if ls.conf.MaxChunkAge > 0 {
return nil
}

return nil
return ls.flushAllPending(ctx)
}

// Stop stops the streamer.
Expand All @@ -167,11 +194,97 @@ func (ls *LogStreamer) Stop() {
return
}
ls.stopped = true
close(ls.stopCh)
// Flush any pending data before closing the queue. Use a background
// context since the job context may already be cancelled at this point.
_ = ls.flushAllPending(context.Background())
close(ls.queue)
ls.processMutex.Unlock()

ls.logger.Debug("[LogStreamer] Waiting for workers to shut down")
ls.workerWG.Wait()
ls.workerCancel()
}

// flushAllPending enqueues all pending data as chunks. Must be called with
// processMutex held.
func (ls *LogStreamer) flushAllPending(ctx context.Context) error {
for len(ls.pending) > 0 {
if err := ls.enqueueNextChunk(ctx); err != nil {
return err
}
}
return nil
}

// enqueueNextChunk takes up to MaxChunkSizeBytes from the front of pending
// and sends it to the upload queue. Must be called with processMutex held.
func (ls *LogStreamer) enqueueNextChunk(ctx context.Context) error {
size := ls.conf.MaxChunkSizeBytes
if pending := uint64(len(ls.pending)); size > pending {
size = pending
}

data := make([]byte, size)
copy(data, ls.pending[:size])
ls.pending = ls.pending[size:]
if len(ls.pending) == 0 {
ls.pending = nil // release backing array
}

ls.order++
chunk := &api.Chunk{
Data: data,
Sequence: ls.order,
Offset: ls.bytes,
Size: size,
}

select {
case ls.queue <- chunk:
case <-ctx.Done():
return ctx.Err()
}
ls.bytes += size
return nil
}

// ageFlushWorker flushes pending data on a timer. The first flush uses a short
// 1s delay so the UI is populated quickly, then switches to MaxChunkAge for
// all subsequent flushes.
func (ls *LogStreamer) ageFlushWorker(ctx context.Context) {
flush := func() {
ls.processMutex.Lock()
if !ls.stopped {
_ = ls.flushAllPending(ctx)
}
ls.processMutex.Unlock()
}

// First flush: upload immediately so the job log has visible output within 1s of starting.
initialDelay := min(time.Second, ls.conf.MaxChunkAge)
select {
case <-time.After(initialDelay):
flush()
case <-ls.stopCh:
return
case <-ctx.Done():
return
}

// Subsequent flushes at the configured interval.
ticker := time.NewTicker(ls.conf.MaxChunkAge)
defer ticker.Stop()
for {
select {
case <-ticker.C:
flush()
case <-ls.stopCh:
return
case <-ctx.Done():
return
}
}
}

// The actual log streamer worker
Expand Down
83 changes: 83 additions & 0 deletions agent/log_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sort"
"sync"
"testing"
"time"

"github.com/buildkite/agent/v3/api"
"github.com/buildkite/agent/v3/logger"
Expand Down Expand Up @@ -94,3 +95,85 @@ func TestLogStreamer(t *testing.T) {
t.Errorf("after Stop: LogStreamer.Process(ctx, %q) err = %v, want %v", input, err, errStreamerStopped)
}
}

func TestLogStreamerStopFlushesPendingAfterContextCancellation(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
logger := logger.NewConsoleLogger(
logger.NewTextPrinter(os.Stderr),
func(c int) { t.Errorf("exit(%d)", c) },
)

var mu sync.Mutex
var got []*api.Chunk
callback := func(ctx context.Context, chunk *api.Chunk) error {
mu.Lock()
got = append(got, chunk)
mu.Unlock()
return nil
}

ls := NewLogStreamer(logger, callback, LogStreamerConfig{
Concurrency: 1,
MaxChunkSizeBytes: 10,
MaxChunkAge: time.Hour,
})

if err := ls.Start(ctx); err != nil {
t.Fatalf("LogStreamer.Start(ctx) = %v", err)
}

input := "hello world"
if err := ls.Process(ctx, []byte(input)); err != nil {
t.Fatalf("LogStreamer.Process(ctx, %q) = %v", input, err)
}

cancel()
ls.Stop()

want := []*api.Chunk{
{
Data: []byte("hello worl"),
Sequence: 1,
Offset: 0,
Size: 10,
},
{
Data: []byte("d"),
Sequence: 2,
Offset: 10,
Size: 1,
},
}
sort.Slice(got, func(i, j int) bool {
return got[i].Sequence < got[j].Sequence
})
if diff := cmp.Diff(got, want); diff != "" {
t.Fatalf("LogStreamer chunks diff after cancellation (-got +want):\n%s", diff)
}
}

func TestLogStreamerStopIsIdempotent(t *testing.T) {
t.Parallel()

ctx := context.Background()
logger := logger.NewConsoleLogger(
logger.NewTextPrinter(os.Stderr),
func(c int) { t.Errorf("exit(%d)", c) },
)

ls := NewLogStreamer(logger, func(ctx context.Context, chunk *api.Chunk) error {
return nil
}, LogStreamerConfig{
Concurrency: 1,
MaxChunkSizeBytes: 10,
})

if err := ls.Start(ctx); err != nil {
t.Fatalf("LogStreamer.Start(ctx) = %v", err)
}

ls.Stop()
ls.Stop()
}
Loading