Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 66 additions & 2 deletions pkg/metrics/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type SchedulerMetrics struct {
tryPreemptionLatency prometheus.Histogram
tryNodeEvaluation prometheus.Histogram
lock locking.RWMutex
tryNodeCount *prometheus.CounterVec
tryApplicationCount *prometheus.CounterVec
}

// InitSchedulerMetrics to initialize scheduler metrics
Expand All @@ -78,6 +80,16 @@ func InitSchedulerMetrics() *SchedulerMetrics {

s.nodeResourceUsage = make(map[string]*prometheus.GaugeVec) // Note: This map might be updated at runtime

initCounterMetrics(s)
initGaugeMetrics(s)
initHistogramMetrics(s)
registerSchedulerMetrics(s)

return s
}

// initCounterMetrics initializes counter-based metrics
func initCounterMetrics(s *SchedulerMetrics) {
s.containerAllocation = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Expand All @@ -94,6 +106,25 @@ func InitSchedulerMetrics() *SchedulerMetrics {
Help: "Total number of application submissions. State of the attempt includes `new`, `accepted` and `rejected`.",
}, []string{"result"})

s.tryNodeCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: SchedulerSubsystem,
Name: "trynode_count",
Help: "Total number of nodes evaluated during scheduling cycle",
}, nil)

s.tryApplicationCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: SchedulerSubsystem,
Name: "tryapplication_count",
Help: "Total number of applications evaluated during scheduling cycle",
}, nil)
}

// initGaugeMetrics initializes gauge-based metrics
func initGaugeMetrics(s *SchedulerMetrics) {
s.application = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Expand All @@ -109,7 +140,10 @@ func InitSchedulerMetrics() *SchedulerMetrics {
Name: "node",
Help: "Total number of nodes. State of the node includes `active` and `failed`.",
}, []string{"state"})
}

// initHistogramMetrics initializes histogram-based metrics
func initHistogramMetrics(s *SchedulerMetrics) {
s.schedulingLatency = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: Namespace,
Expand Down Expand Up @@ -168,8 +202,10 @@ func InitSchedulerMetrics() *SchedulerMetrics {
Buckets: prometheus.ExponentialBuckets(0.0001, 10, 8),
},
)
}

// Register the metrics
// registerSchedulerMetrics registers all scheduler metrics with Prometheus
func registerSchedulerMetrics(s *SchedulerMetrics) {
var metricsList = []prometheus.Collector{
s.containerAllocation,
s.applicationSubmission,
Expand All @@ -181,13 +217,14 @@ func InitSchedulerMetrics() *SchedulerMetrics {
s.schedulingCycle,
s.tryNodeEvaluation,
s.tryPreemptionLatency,
s.tryNodeCount,
s.tryApplicationCount,
}
for _, metric := range metricsList {
if err := prometheus.Register(metric); err != nil {
log.Log(log.Metrics).Warn("failed to register metrics collector", zap.Error(err))
}
}
return s
}

// Reset all metrics that implement the Reset functionality.
Expand All @@ -197,6 +234,8 @@ func (m *SchedulerMetrics) Reset() {
m.application.Reset()
m.applicationSubmission.Reset()
m.containerAllocation.Reset()
m.tryNodeCount.Reset()
m.tryApplicationCount.Reset()
}

func SinceInSeconds(start time.Time) float64 {
Expand Down Expand Up @@ -274,6 +313,31 @@ func (m *SchedulerMetrics) GetSchedulingErrors() (int, error) {
return -1, err
}

func (m *SchedulerMetrics) AddTryNodeCount(count int64) {
m.tryNodeCount.With(nil).Add(float64(count))
}

func (m *SchedulerMetrics) ResetTryNodeCount() {
m.tryNodeCount.Reset()
}

func (m *SchedulerMetrics) AddTryApplicationCount(count int64) {
m.tryApplicationCount.With(nil).Add(float64(count))
}

func (m *SchedulerMetrics) ResetTryApplicationCount() {
m.tryApplicationCount.Reset()
}

func (m *SchedulerMetrics) GetTryNodeCount() (int64, error) {
metricDto := &dto.Metric{}
err := m.tryNodeCount.With(nil).Write(metricDto)
if err == nil {
return int64(*metricDto.Counter.Value), nil
}
return -1, err
}

func (m *SchedulerMetrics) IncTotalApplicationsNew() {
m.applicationSubmission.WithLabelValues(AppNew).Inc()
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/metrics/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,44 @@ func verifyMetric(t *testing.T, expectedCounter float64, expectedState string, n
assert.Assert(t, checked, "Failed to find metric")
}

func TestTryNodeCount(t *testing.T) {
sm = getSchedulerMetrics(t)
defer unregisterMetrics()

sm.AddTryNodeCount(5)
count, err := sm.GetTryNodeCount()
assert.NilError(t, err)
assert.Equal(t, int64(5), count)

sm.AddTryNodeCount(3)
count, err = sm.GetTryNodeCount()
assert.NilError(t, err)
assert.Equal(t, int64(8), count)

sm.ResetTryNodeCount()
sm.AddTryNodeCount(1)
count, err = sm.GetTryNodeCount()
assert.NilError(t, err)
assert.Equal(t, int64(1), count)
}

func TestTryApplicationCount(t *testing.T) {
sm = getSchedulerMetrics(t)
defer unregisterMetrics()

sm.AddTryApplicationCount(4)
metricDto := &dto.Metric{}
err := sm.tryApplicationCount.With(nil).Write(metricDto)
assert.NilError(t, err)
assert.Equal(t, int64(4), int64(*metricDto.Counter.Value))

sm.ResetTryApplicationCount()
sm.AddTryApplicationCount(2)
err = sm.tryApplicationCount.With(nil).Write(metricDto)
assert.NilError(t, err)
assert.Equal(t, int64(2), int64(*metricDto.Counter.Value))
}

func unregisterMetrics() {
sm := GetSchedulerMetrics()
prometheus.Unregister(sm.containerAllocation)
Expand All @@ -244,4 +282,6 @@ func unregisterMetrics() {
prometheus.Unregister(sm.tryNodeLatency)
prometheus.Unregister(sm.tryNodeEvaluation)
prometheus.Unregister(sm.tryPreemptionLatency)
prometheus.Unregister(sm.tryNodeCount)
prometheus.Unregister(sm.tryApplicationCount)
}
5 changes: 5 additions & 0 deletions pkg/scheduler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func (cc *ClusterContext) schedule() bool {
// schedule each partition defined in the cluster
activity := false
scheduleCycleStart := time.Now()

// Reset scheduling cycle counters at the start of each cycle
metrics.GetSchedulerMetrics().ResetTryNodeCount()
metrics.GetSchedulerMetrics().ResetTryApplicationCount()

for _, psc := range cc.GetPartitionMapClone() {
// if there are no resources in the partition just skip
if psc.root.GetMaxResource() == nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/objects/allocation_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type AllocationResult struct {
NodeID string
ReservedNodeID string
CancelledReservations int
NodesTried int64
ApplicationsTried int64
}

func (ar *AllocationResult) String() string {
Expand Down
Loading
Loading