From 63fd5feea99b95075114cf4878706524857d5210 Mon Sep 17 00:00:00 2001 From: AltayAkkus <7849969+AltayAkkus@users.noreply.github.com> Date: Mon, 23 Mar 2026 21:59:21 +0100 Subject: [PATCH 1/2] refactor: verbose inc/decr methods with GaugedCounter --- go.mod | 1 + internal/pkg/archiver/worker.go | 4 +- internal/pkg/postprocessor/postprocessor.go | 4 +- internal/pkg/preprocessor/preprocessor.go | 4 +- internal/pkg/stats/counter.go | 34 +++++++- internal/pkg/stats/counter_test.go | 94 +++++++++++++++++++++ internal/pkg/stats/methods.go | 80 ------------------ internal/pkg/stats/stats.go | 40 +++++---- 8 files changed, 158 insertions(+), 103 deletions(-) diff --git a/go.mod b/go.mod index 015cd618..cb504f16 100644 --- a/go.mod +++ b/go.mod @@ -75,6 +75,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/klauspost/compress v1.18.4 // indirect github.com/klauspost/cpuid/v2 v2.0.12 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect github.com/lucasb-eyer/go-colorful v1.3.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect diff --git a/internal/pkg/archiver/worker.go b/internal/pkg/archiver/worker.go index 85a1272e..23b8f889 100644 --- a/internal/pkg/archiver/worker.go +++ b/internal/pkg/archiver/worker.go @@ -164,8 +164,8 @@ func (a *archiver) worker(workerID string) { controlChans := pause.Subscribe() defer pause.Unsubscribe(controlChans) - stats.ArchiverRoutinesIncr() - defer stats.ArchiverRoutinesDecr() + stats.ArchiverRoutines.Add(1) + defer stats.ArchiverRoutines.Done() for { select { diff --git a/internal/pkg/postprocessor/postprocessor.go b/internal/pkg/postprocessor/postprocessor.go index 063f003f..a23ddbea 100644 --- a/internal/pkg/postprocessor/postprocessor.go +++ b/internal/pkg/postprocessor/postprocessor.go @@ -72,8 +72,8 @@ func (p *postprocessor) worker(workerID string) { "worker_id": workerID, }) - stats.PostprocessorRoutinesIncr() - defer stats.PostprocessorRoutinesDecr() + stats.PostprocessorRoutines.Add(1) + defer stats.PostprocessorRoutines.Done() // Subscribe to the pause controler controlChans := pause.Subscribe() diff --git a/internal/pkg/preprocessor/preprocessor.go b/internal/pkg/preprocessor/preprocessor.go index 996b9155..4637a5cf 100644 --- a/internal/pkg/preprocessor/preprocessor.go +++ b/internal/pkg/preprocessor/preprocessor.go @@ -114,8 +114,8 @@ func (p *preprocessor) worker(workerID string) { controlChans := pause.Subscribe() defer pause.Unsubscribe(controlChans) - stats.PreprocessorRoutinesIncr() - defer stats.PreprocessorRoutinesDecr() + stats.PreprocessorRoutines.Add(1) + defer stats.PreprocessorRoutines.Done() for { select { diff --git a/internal/pkg/stats/counter.go b/internal/pkg/stats/counter.go index 7af2505e..b780f033 100644 --- a/internal/pkg/stats/counter.go +++ b/internal/pkg/stats/counter.go @@ -1,6 +1,12 @@ package stats -import "sync/atomic" +import ( + "sync/atomic" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/internetarchive/Zeno/internal/pkg/config" +) type counter struct { count uint64 @@ -21,3 +27,29 @@ func (c *counter) get() uint64 { func (c *counter) reset() { atomic.StoreUint64(&c.count, 0) } + +// A GaugedCounter is a atomic counter + Prometheus gauge +// The promGauge needs to be wired in the Init, although it is optional +// Add(i) and Done() mutate the counter and Prometheus gauge in one call +type GaugedCounter struct { + counter + promGauge *prometheus.GaugeVec +} + +func (gc *GaugedCounter) Add(n uint64) { + gc.incr(n) + if gc.promGauge != nil { + gc.promGauge.WithLabelValues(config.Get().JobPrometheus, hostname, version).Add(float64(n)) + } +} + +func (gc *GaugedCounter) Done() { + gc.decr(1) + if gc.promGauge != nil { + gc.promGauge.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec() + } +} + +func (gc *GaugedCounter) Value() uint64 { + return gc.get() +} diff --git a/internal/pkg/stats/counter_test.go b/internal/pkg/stats/counter_test.go index ed8323f6..25bf50f5 100644 --- a/internal/pkg/stats/counter_test.go +++ b/internal/pkg/stats/counter_test.go @@ -3,6 +3,10 @@ package stats import ( "sync/atomic" "testing" + + "github.com/internetarchive/Zeno/internal/pkg/config" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" ) func TestCounter_Incr(t *testing.T) { @@ -74,3 +78,93 @@ func TestCounter_Reset(t *testing.T) { t.Errorf("expected count to be 0 after reset, got %d", atomic.LoadUint64(&c.count)) } } + +func TestGaugedCounter_AddDoneValue(t *testing.T) { + gc := &GaugedCounter{} + + gc.Add(3) + if gc.Value() != 3 { + t.Errorf("expected 3, got %d", gc.Value()) + } + + gc.Done() + if gc.Value() != 2 { + t.Errorf("expected 2, got %d", gc.Value()) + } + + gc.Add(5) + gc.Done() + gc.Done() + if gc.Value() != 5 { + t.Errorf("expected 5, got %d", gc.Value()) + } +} + +func TestGaugedCounter_NilPrometheus(t *testing.T) { + gc := &GaugedCounter{} + + gc.Add(1) + defer gc.Done() + + if gc.Value() != 1 { + t.Errorf("expected 1, got %d", gc.Value()) + } +} + +func TestGaugedCounter_WithPrometheus(t *testing.T) { + config.Set(&config.Config{JobPrometheus: "testjob"}) + hostname = "testhost" + version = "v0.0.0-test" + + gauge := prometheus.NewGaugeVec( + prometheus.GaugeOpts{Name: "test_gauged_counter", Help: "test"}, + []string{"project", "hostname", "version"}, + ) + + gc := &GaugedCounter{promGauge: gauge} + + gc.Add(3) + if gc.Value() != 3 { + t.Errorf("atomic: expected 3, got %d", gc.Value()) + } + promVal := testutil.ToFloat64(gauge.WithLabelValues("testjob", "testhost", "v0.0.0-test")) + if promVal != 3 { + t.Errorf("prometheus: expected 3, got %f", promVal) + } + + gc.Done() + gc.Done() + gc.Done() + if gc.Value() != 0 { + t.Errorf("atomic: expected 0, got %d", gc.Value()) + } + promVal = testutil.ToFloat64(gauge.WithLabelValues("testjob", "testhost", "v0.0.0-test")) + if promVal != 0 { + t.Errorf("prometheus: expected 0, got %f", promVal) + } +} + +func TestGaugedCounter_PrometheusBatchAdd(t *testing.T) { + config.Set(&config.Config{JobPrometheus: "testjob"}) + hostname = "testhost" + version = "v0.0.0-test" + + gauge := prometheus.NewGaugeVec( + prometheus.GaugeOpts{Name: "test_gauged_counter_batch", Help: "test"}, + []string{"project", "hostname", "version"}, + ) + + gc := &GaugedCounter{promGauge: gauge} + + gc.Add(10) + gc.Done() + gc.Add(5) + + if gc.Value() != 14 { + t.Errorf("atomic: expected 14, got %d", gc.Value()) + } + promVal := testutil.ToFloat64(gauge.WithLabelValues("testjob", "testhost", "v0.0.0-test")) + if promVal != 14 { + t.Errorf("prometheus: expected 14, got %f", promVal) + } +} diff --git a/internal/pkg/stats/methods.go b/internal/pkg/stats/methods.go index 8617bad2..ed343332 100644 --- a/internal/pkg/stats/methods.go +++ b/internal/pkg/stats/methods.go @@ -31,86 +31,6 @@ func SeedsFinishedIncr() { } } -////////////////////////// -// PreprocessorRoutines // -////////////////////////// - -// PreprocessorRoutinesIncr increments the PreprocessorRoutines counter by 1. -func PreprocessorRoutinesIncr() { - globalStats.PreprocessorRoutines.incr(1) - if globalPromStats != nil { - globalPromStats.preprocessorRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc() - } -} - -// PreprocessorRoutinesDecr decrements the PreprocessorRoutines counter by 1. -func PreprocessorRoutinesDecr() { - globalStats.PreprocessorRoutines.decr(1) - if globalPromStats != nil { - globalPromStats.preprocessorRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec() - } -} - -////////////////////////// -// ArchiverRoutines // -////////////////////////// - -// ArchiverRoutinesIncr increments the ArchiverRoutines counter by 1. -func ArchiverRoutinesIncr() { - globalStats.ArchiverRoutines.incr(1) - if globalPromStats != nil { - globalPromStats.archiverRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc() - } -} - -// ArchiverRoutinesDecr decrements the ArchiverRoutines counter by 1. -func ArchiverRoutinesDecr() { - globalStats.ArchiverRoutines.decr(1) - if globalPromStats != nil { - globalPromStats.archiverRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec() - } -} - -////////////////////////// -// PostprocessorRoutines // -////////////////////////// - -// PostprocessorRoutinesIncr increments the PostprocessorRoutines counter by 1. -func PostprocessorRoutinesIncr() { - globalStats.PostprocessorRoutines.incr(1) - if globalPromStats != nil { - globalPromStats.postprocessorRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc() - } -} - -// PostprocessorRoutinesDecr decrements the PostprocessorRoutines counter by 1. -func PostprocessorRoutinesDecr() { - globalStats.PostprocessorRoutines.decr(1) - if globalPromStats != nil { - globalPromStats.postprocessorRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec() - } -} - -////////////////////////// -// FinisherRoutines // -////////////////////////// - -// FinisherRoutinesIncr increments the FinisherRoutines counter by 1. -func FinisherRoutinesIncr() { - globalStats.FinisherRoutines.incr(1) - if globalPromStats != nil { - globalPromStats.finisherRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Inc() - } -} - -// FinisherRoutinesDecr decrements the FinisherRoutines counter by 1. -func FinisherRoutinesDecr() { - globalStats.FinisherRoutines.decr(1) - if globalPromStats != nil { - globalPromStats.finisherRoutines.WithLabelValues(config.Get().JobPrometheus, hostname, version).Dec() - } -} - ////////////////////////// // Paused // ////////////////////////// diff --git a/internal/pkg/stats/stats.go b/internal/pkg/stats/stats.go index fad242d7..033c30cc 100644 --- a/internal/pkg/stats/stats.go +++ b/internal/pkg/stats/stats.go @@ -13,10 +13,6 @@ import ( type stats struct { URLsCrawled *rate SeedsFinished *rate - PreprocessorRoutines *counter - ArchiverRoutines *counter - PostprocessorRoutines *counter - FinisherRoutines *counter Paused atomic.Bool HTTPReturnCodes *rateBucket SeencheckFailures atomic.Int64 @@ -42,6 +38,11 @@ var ( doOnce sync.Once hostname string version string + + PreprocessorRoutines *GaugedCounter + ArchiverRoutines *GaugedCounter + PostprocessorRoutines *GaugedCounter + FinisherRoutines *GaugedCounter ) func Init() error { @@ -52,16 +53,17 @@ func Init() error { globalStats = &stats{ URLsCrawled: &rate{}, SeedsFinished: &rate{}, - PreprocessorRoutines: &counter{}, - ArchiverRoutines: &counter{}, - PostprocessorRoutines: &counter{}, - FinisherRoutines: &counter{}, HTTPReturnCodes: newRateBucket(), MeanHTTPResponseTime: &mean{}, MeanProcessBodyTime: &mean{}, MeanWaitOnFeedbackTime: &mean{}, } + PreprocessorRoutines = &GaugedCounter{} + ArchiverRoutines = &GaugedCounter{} + PostprocessorRoutines = &GaugedCounter{} + FinisherRoutines = &GaugedCounter{} + if config.Get() != nil && config.Get().Prometheus { globalPromStats = newPrometheusStats() @@ -75,6 +77,12 @@ func Init() error { versionStruct := utils.GetVersion() version = versionStruct.Version + // Wire prometheus gauges into the counters + PreprocessorRoutines.promGauge = globalPromStats.preprocessorRoutines + ArchiverRoutines.promGauge = globalPromStats.archiverRoutines + PostprocessorRoutines.promGauge = globalPromStats.postprocessorRoutines + FinisherRoutines.promGauge = globalPromStats.finisherRoutines + registerPrometheusMetrics() } @@ -95,10 +103,10 @@ func Init() error { func Reset() { globalStats.URLsCrawled.reset() globalStats.SeedsFinished.reset() - globalStats.PreprocessorRoutines.reset() - globalStats.ArchiverRoutines.reset() - globalStats.PostprocessorRoutines.reset() - globalStats.FinisherRoutines.reset() + PreprocessorRoutines.reset() + ArchiverRoutines.reset() + PostprocessorRoutines.reset() + FinisherRoutines.reset() globalStats.HTTPReturnCodes.resetAll() globalStats.MeanHTTPResponseTime.reset() globalStats.MeanProcessBodyTime.reset() @@ -112,10 +120,10 @@ func GetMapTUI() map[string]any { "URL/s": globalStats.URLsCrawled.get(), "Total URL crawled": globalStats.URLsCrawled.getTotal(), "Finished seeds": globalStats.SeedsFinished.getTotal(), - "Preprocessor routines": globalStats.PreprocessorRoutines.get(), - "Archiver routines": globalStats.ArchiverRoutines.get(), - "Postprocessor routines": globalStats.PostprocessorRoutines.get(), - "Finisher routines": globalStats.FinisherRoutines.get(), + "Preprocessor routines": PreprocessorRoutines.Value(), + "Archiver routines": ArchiverRoutines.Value(), + "Postprocessor routines": PostprocessorRoutines.Value(), + "Finisher routines": FinisherRoutines.Value(), "Is paused?": globalStats.Paused.Load(), "HTTP 2xx/s": bucketSum(globalStats.HTTPReturnCodes.getFiltered("2*")), "HTTP 3xx/s": bucketSum(globalStats.HTTPReturnCodes.getFiltered("3*")), From 6978f435e8ae0b5879ca61bc814988632685f96d Mon Sep 17 00:00:00 2001 From: AltayAkkus <7849969+AltayAkkus@users.noreply.github.com> Date: Mon, 23 Mar 2026 22:30:10 +0100 Subject: [PATCH 2/2] feat: add in-transit counters for pipeline components --- internal/pkg/archiver/worker.go | 3 +++ internal/pkg/finisher/finisher.go | 2 ++ internal/pkg/postprocessor/postprocessor.go | 4 +++ internal/pkg/preprocessor/preprocessor.go | 3 +++ internal/pkg/stats/prometheus.go | 28 +++++++++++++++++++++ internal/pkg/stats/stats.go | 23 +++++++++++++++++ 6 files changed, 63 insertions(+) diff --git a/internal/pkg/archiver/worker.go b/internal/pkg/archiver/worker.go index 23b8f889..68836f39 100644 --- a/internal/pkg/archiver/worker.go +++ b/internal/pkg/archiver/worker.go @@ -178,6 +178,7 @@ func (a *archiver) worker(workerID string) { logger.Debug("received resume event") case seed, ok := <-a.inputCh: if ok { + stats.ArchiverInTransit.Add(1) logger.Debug("received seed", "seed", seed.GetShortID(), "depth", seed.GetDepth(), "hops", seed.GetURL().GetHops()) if err := seed.CheckConsistency(); err != nil { @@ -192,10 +193,12 @@ func (a *archiver) worker(workerID string) { select { case <-a.ctx.Done(): + stats.ArchiverInTransit.Done() logger.Debug("aborting seed due to stop", "seed", seed.GetShortID(), "depth", seed.GetDepth(), "hops", seed.GetURL().GetHops()) return case a.outputCh <- seed: } + stats.ArchiverInTransit.Done() } } } diff --git a/internal/pkg/finisher/finisher.go b/internal/pkg/finisher/finisher.go index 0a2ac514..51eed7df 100644 --- a/internal/pkg/finisher/finisher.go +++ b/internal/pkg/finisher/finisher.go @@ -95,9 +95,11 @@ func (f *finisher) worker(workerID string) { logger.Debug("received resume event") case seed, ok := <-f.inputCh: if ok { + stats.FinisherInTransit.Add(1) if err := f.handleSeed(seed, workerID, logger); err != nil { panic(err) } + stats.FinisherInTransit.Done() } } } diff --git a/internal/pkg/postprocessor/postprocessor.go b/internal/pkg/postprocessor/postprocessor.go index a23ddbea..4ff3b660 100644 --- a/internal/pkg/postprocessor/postprocessor.go +++ b/internal/pkg/postprocessor/postprocessor.go @@ -90,6 +90,7 @@ func (p *postprocessor) worker(workerID string) { logger.Debug("received resume event") case seed, ok := <-p.inputCh: if ok { + stats.PostprocessorInTransit.Add(1) logger.Debug("received seed", "seed", seed.GetShortID()) if err := seed.CheckConsistency(); err != nil { @@ -103,6 +104,7 @@ func (p *postprocessor) worker(workerID string) { for i := range outlinks { select { case <-p.ctx.Done(): + stats.PostprocessorInTransit.Done() logger.Debug("aborting outlink feeding due to stop", "seed", outlinks[i].GetShortID()) return case p.outputCh <- outlinks[i]: @@ -115,10 +117,12 @@ func (p *postprocessor) worker(workerID string) { select { case <-p.ctx.Done(): + stats.PostprocessorInTransit.Done() logger.Debug("aborting seed due to stop", "seed", seed.GetShortID()) return case p.outputCh <- seed: } + stats.PostprocessorInTransit.Done() } } } diff --git a/internal/pkg/preprocessor/preprocessor.go b/internal/pkg/preprocessor/preprocessor.go index 4637a5cf..4707b12b 100644 --- a/internal/pkg/preprocessor/preprocessor.go +++ b/internal/pkg/preprocessor/preprocessor.go @@ -128,6 +128,7 @@ func (p *preprocessor) worker(workerID string) { logger.Debug("received resume event") case seed, ok := <-p.inputCh: if ok { + stats.PreprocessorInTransit.Add(1) logger.Debug("received seed", "seed", seed.GetShortID()) if err := seed.CheckConsistency(); err != nil { @@ -144,10 +145,12 @@ func (p *preprocessor) worker(workerID string) { select { case <-p.ctx.Done(): + stats.PreprocessorInTransit.Done() logger.Debug("aborting seed due to stop", "seed", seed.GetShortID()) return case p.outputCh <- seed: } + stats.PreprocessorInTransit.Done() } } } diff --git a/internal/pkg/stats/prometheus.go b/internal/pkg/stats/prometheus.go index 6c71eaa1..e0aa7b2f 100644 --- a/internal/pkg/stats/prometheus.go +++ b/internal/pkg/stats/prometheus.go @@ -29,6 +29,12 @@ type prometheusStats struct { akamaiMitigated *prometheus.GaugeVec seencheckFailures *prometheus.CounterVec + // In-transit item flow metrics + preprocessorInTransit *prometheus.GaugeVec + archiverInTransit *prometheus.GaugeVec + postprocessorInTransit *prometheus.GaugeVec + finisherInTransit *prometheus.GaugeVec + // Dedup WARC metrics dataTotalBytes *prometheus.GaugeVec cdxDedupeTotalBytes *prometheus.GaugeVec @@ -145,6 +151,22 @@ func newPrometheusStats() *prometheusStats { prometheus.CounterOpts{Name: config.Get().PrometheusPrefix + "seencheck_failures", Help: "Total number of seencheck failures"}, []string{"project", "hostname", "version"}, ), + preprocessorInTransit: prometheus.NewGaugeVec( + prometheus.GaugeOpts{Name: config.Get().PrometheusPrefix + "preprocessor_in_transit", Help: "Number of items currently being processed by the preprocessor"}, + []string{"project", "hostname", "version"}, + ), + archiverInTransit: prometheus.NewGaugeVec( + prometheus.GaugeOpts{Name: config.Get().PrometheusPrefix + "archiver_in_transit", Help: "Number of items currently being processed by the archiver"}, + []string{"project", "hostname", "version"}, + ), + postprocessorInTransit: prometheus.NewGaugeVec( + prometheus.GaugeOpts{Name: config.Get().PrometheusPrefix + "postprocessor_in_transit", Help: "Number of items currently being processed by the postprocessor"}, + []string{"project", "hostname", "version"}, + ), + finisherInTransit: prometheus.NewGaugeVec( + prometheus.GaugeOpts{Name: config.Get().PrometheusPrefix + "finisher_in_transit", Help: "Number of items currently being processed by the finisher"}, + []string{"project", "hostname", "version"}, + ), } } @@ -168,6 +190,12 @@ func registerPrometheusMetrics() { prometheus.MustRegister(globalPromStats.akamaiMitigated) prometheus.MustRegister(globalPromStats.seencheckFailures) + // Register in-transit item flow metrics + prometheus.MustRegister(globalPromStats.preprocessorInTransit) + prometheus.MustRegister(globalPromStats.archiverInTransit) + prometheus.MustRegister(globalPromStats.postprocessorInTransit) + prometheus.MustRegister(globalPromStats.finisherInTransit) + // Register dedup WARC metrics prometheus.MustRegister(globalPromStats.dataTotalBytes) prometheus.MustRegister(globalPromStats.cdxDedupeTotalBytes) diff --git a/internal/pkg/stats/stats.go b/internal/pkg/stats/stats.go index 033c30cc..6ae3eb05 100644 --- a/internal/pkg/stats/stats.go +++ b/internal/pkg/stats/stats.go @@ -43,6 +43,11 @@ var ( ArchiverRoutines *GaugedCounter PostprocessorRoutines *GaugedCounter FinisherRoutines *GaugedCounter + + PreprocessorInTransit *GaugedCounter + ArchiverInTransit *GaugedCounter + PostprocessorInTransit *GaugedCounter + FinisherInTransit *GaugedCounter ) func Init() error { @@ -64,6 +69,11 @@ func Init() error { PostprocessorRoutines = &GaugedCounter{} FinisherRoutines = &GaugedCounter{} + PreprocessorInTransit = &GaugedCounter{} + ArchiverInTransit = &GaugedCounter{} + PostprocessorInTransit = &GaugedCounter{} + FinisherInTransit = &GaugedCounter{} + if config.Get() != nil && config.Get().Prometheus { globalPromStats = newPrometheusStats() @@ -83,6 +93,11 @@ func Init() error { PostprocessorRoutines.promGauge = globalPromStats.postprocessorRoutines FinisherRoutines.promGauge = globalPromStats.finisherRoutines + PreprocessorInTransit.promGauge = globalPromStats.preprocessorInTransit + ArchiverInTransit.promGauge = globalPromStats.archiverInTransit + PostprocessorInTransit.promGauge = globalPromStats.postprocessorInTransit + FinisherInTransit.promGauge = globalPromStats.finisherInTransit + registerPrometheusMetrics() } @@ -107,6 +122,10 @@ func Reset() { ArchiverRoutines.reset() PostprocessorRoutines.reset() FinisherRoutines.reset() + PreprocessorInTransit.reset() + ArchiverInTransit.reset() + PostprocessorInTransit.reset() + FinisherInTransit.reset() globalStats.HTTPReturnCodes.resetAll() globalStats.MeanHTTPResponseTime.reset() globalStats.MeanProcessBodyTime.reset() @@ -124,6 +143,10 @@ func GetMapTUI() map[string]any { "Archiver routines": ArchiverRoutines.Value(), "Postprocessor routines": PostprocessorRoutines.Value(), "Finisher routines": FinisherRoutines.Value(), + "Preprocessor in-transit": PreprocessorInTransit.Value(), + "Archiver in-transit": ArchiverInTransit.Value(), + "Postprocessor in-transit": PostprocessorInTransit.Value(), + "Finisher in-transit": FinisherInTransit.Value(), "Is paused?": globalStats.Paused.Load(), "HTTP 2xx/s": bucketSum(globalStats.HTTPReturnCodes.getFiltered("2*")), "HTTP 3xx/s": bucketSum(globalStats.HTTPReturnCodes.getFiltered("3*")),