From fc5a997ff8b53a78b3861b8f33b793907f520be9 Mon Sep 17 00:00:00 2001 From: "mishra.shubham" Date: Wed, 22 Apr 2026 20:11:31 +0530 Subject: [PATCH] [YUNIKORN-3243] Bounded visit guarantee for sibling queues When two sibling queues have highly asymmetric guaranteed resources (e.g. 3600:1), the DRF sort keeps the larger queue's ratio below the smaller queue's for thousands of cycles. Combined with the single-successful-allocation-per-cycle contract, the smaller queue is never visited by TryAllocate, which also keeps its asks invisible to the autoscaler (inspectOutstandingRequests gates on schedulingAttempted, which is only set inside app.tryAllocate()). This change adds a configurable bounded-visit guarantee at the parent queue level: * New queue property `queue.starvation.delay` (default 15s, 0 disables) configures how long a child queue may be skipped before the parent's sort hoists it to the front. * Each queue tracks `lastSchedulingAttempt`, started when pending demand transitions from zero, cleared when pending drains, and refreshed at the top of every TryAllocate descent. * sortQueues() runs the normal DRF/priority sort first, then hoistStarvedQueues() moves children whose elapsed time exceeds the delay to the front, ordered oldest-first so the most-neglected sibling wins when multiple queues cross the threshold in the same cycle. Non-starved children keep their DRF order. The implementation is local to queue.go, does not change the scheduling cycle's single-allocation contract (keeps K8s predicate correctness), and is opt-outable per queue. Integration tests reuse the 3600:1 reproducer from PR #1077 to validate that the sparkpi ask is served on the first cycle after the delay elapses and that its schedulingAttempted flag flips to true, restoring the autoscaler signal. Made-with: Cursor --- pkg/common/configs/configvalidator.go | 7 + pkg/scheduler/objects/queue.go | 170 ++++++++- .../objects/queue_starvation_hoist_test.go | 338 ++++++++++++++++++ pkg/scheduler/tests/queue_starvation_test.go | 226 ++++++++++++ 4 files changed, 740 insertions(+), 1 deletion(-) create mode 100644 pkg/scheduler/objects/queue_starvation_hoist_test.go diff --git a/pkg/common/configs/configvalidator.go b/pkg/common/configs/configvalidator.go index 3117368fe..837fa133b 100644 --- a/pkg/common/configs/configvalidator.go +++ b/pkg/common/configs/configvalidator.go @@ -51,6 +51,7 @@ const ( PreemptionPolicy = "preemption.policy" PreemptionDelay = "preemption.delay" QuotaPreemptionDelay = "quota.preemption.delay" + QueueStarvationDelay = "queue.starvation.delay" // app sort priority values ApplicationSortPriorityEnabled = "enabled" @@ -75,6 +76,12 @@ var DefaultQuotaPreemptionDelay time.Duration = 0 var DefaultPreemptionDelay = 30 * time.Second var DefaultAskBackOffDelay = 30 * time.Second +// DefaultQueueStarvationDelay is the default value for the queue.starvation.delay property. +// When a child queue has pending demand but the parent's sort has skipped it for longer than +// this duration, the parent's sort will hoist the starving child to the front of the order. +// A value of 0 disables the hoisting behaviour on that parent queue. See YUNIKORN-3243. +var DefaultQueueStarvationDelay = 15 * time.Second + // QueueNameRegExp to validate the name of a queue. // A queue can be a username with the dot replaced. Most systems allow a 32 character username. // The queue name must thus allow for at least that length with the replacement of dots. diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go index db7a1f18b..5e2cb808f 100644 --- a/pkg/scheduler/objects/queue.go +++ b/pkg/scheduler/objects/queue.go @@ -22,6 +22,7 @@ import ( "context" "errors" "fmt" + "sort" "strconv" "strings" "time" @@ -94,6 +95,15 @@ type Queue struct { isQuotaPreemptionRunning bool unschedAskBackoff uint64 askBackoffDelay time.Duration + // starvationDelay controls how long a child queue can remain unvisited by this + // queue's TryAllocate sort before it is hoisted to the front of the sibling order. + // 0 disables the hoisting behaviour on this queue. See YUNIKORN-3243. + starvationDelay time.Duration + // lastSchedulingAttempt is the last wall-clock time TryAllocate descended into + // this queue. Used by the parent's sort to detect queues that have been skipped + // for too long despite having pending demand. Zero value means "not tracked" + // (i.e. this queue currently has no pending demand). See YUNIKORN-3243. + lastSchedulingAttempt time.Time locking.RWMutex } @@ -119,6 +129,8 @@ func newBlankQueue() *Queue { quotaPreemptionDelay: 0, quotaPreemptionStartTime: time.Time{}, askBackoffDelay: configs.DefaultAskBackOffDelay, + starvationDelay: configs.DefaultQueueStarvationDelay, + lastSchedulingAttempt: time.Time{}, } } @@ -283,6 +295,20 @@ func convertDelay(value string, def time.Duration) (time.Duration, error) { return result, nil } +// convertStarvationDelay parses a duration string for queue.starvation.delay. +// Unlike convertDelay, a value of zero is allowed and means "feature disabled on this queue". +// Negative values fall back to the default. See YUNIKORN-3243. +func convertStarvationDelay(value string) (time.Duration, error) { + result, err := time.ParseDuration(value) + if err != nil { + return configs.DefaultQueueStarvationDelay, err + } + if int64(result) < int64(0) { + return configs.DefaultQueueStarvationDelay, fmt.Errorf("starvation delay must be non-negative: %s", value) + } + return result, nil +} + func priorityOffset(value string) (int32, error) { intValue, err := strconv.ParseInt(value, 10, 32) if err != nil { @@ -722,6 +748,12 @@ func (sq *Queue) UpdateQueueProperties(oldMaxResource *resources.Resource) { zap.Error(err)) } sq.setPreemptionTime(oldMaxResource, oldDelay) + case configs.QueueStarvationDelay: + sq.starvationDelay, err = convertStarvationDelay(value) + if err != nil { + log.Log(log.SchedQueue).Debug("queue starvation delay configuration error", + zap.Error(err)) + } default: // skip unknown properties just log them log.Log(log.SchedQueue).Debug("queue property skipped", @@ -926,7 +958,14 @@ func (sq *Queue) incPendingResource(delta *resources.Resource) { // update this queue sq.Lock() defer sq.Unlock() + wasZero := !resources.StrictlyGreaterThanZero(sq.pending) sq.pending = resources.Add(sq.pending, delta) + // Start the starvation clock when pending transitions from zero to non-zero. + // This gives a newly-pending queue a full grace window before it can be hoisted. + // See YUNIKORN-3243. + if wasZero && resources.StrictlyGreaterThanZero(sq.pending) && sq.lastSchedulingAttempt.IsZero() { + sq.lastSchedulingAttempt = time.Now() + } sq.updatePendingResourceMetrics() } @@ -955,7 +994,71 @@ func (sq *Queue) decPendingResource(delta *resources.Resource) { // If we prune the resource first and the resource become nil after pruning, // the metrics will not be updated with nil resource, this is not expected. sq.pending.Prune() + // Pending dropped to zero: stop tracking starvation until new demand arrives. + // See YUNIKORN-3243. + if !resources.StrictlyGreaterThanZero(sq.pending) { + sq.lastSchedulingAttempt = time.Time{} + } + } +} + +// GetStarvationDelay returns the configured starvation delay for this queue. +// When > 0, the queue's sortQueues() will hoist any child with pending demand +// that has not been visited within this duration to the front of the sort order. +// See YUNIKORN-3243. +func (sq *Queue) GetStarvationDelay() time.Duration { + sq.RLock() + defer sq.RUnlock() + return sq.starvationDelay +} + +// GetLastSchedulingAttempt returns the wall-clock time at which TryAllocate last +// descended into this queue. A zero value indicates the queue has no pending +// demand and is not currently being tracked. See YUNIKORN-3243. +func (sq *Queue) GetLastSchedulingAttempt() time.Time { + sq.RLock() + defer sq.RUnlock() + return sq.lastSchedulingAttempt +} + +// SetLastSchedulingAttempt overrides the last-scheduling-attempt timestamp. +// This is only intended for tests that need to simulate a queue that has been +// skipped for an extended period. +func (sq *Queue) SetLastSchedulingAttempt(t time.Time) { + sq.Lock() + defer sq.Unlock() + sq.lastSchedulingAttempt = t +} + +// markSchedulingAttempted records that TryAllocate has descended into this queue. +// Called at the entry of TryAllocate so that siblings whose TryAllocate was never +// reached (because an earlier sibling's allocation short-circuited the loop) keep +// their older timestamp and can be hoisted by the parent's next sort. +// See YUNIKORN-3243. +func (sq *Queue) markSchedulingAttempted() { + sq.Lock() + defer sq.Unlock() + sq.lastSchedulingAttempt = time.Now() +} + +// isStarvedAt reports whether this queue has pending demand and has not been +// visited by TryAllocate within the given delay, evaluated as of now. +// A zero or negative delay disables the check. A queue that has never had +// its clock started (zero lastSchedulingAttempt) is not considered starving. +// See YUNIKORN-3243. +func (sq *Queue) isStarvedAt(now time.Time, delay time.Duration) bool { + if delay <= 0 { + return false } + sq.RLock() + defer sq.RUnlock() + if sq.lastSchedulingAttempt.IsZero() { + return false + } + if !resources.StrictlyGreaterThanZero(sq.pending) { + return false + } + return now.Sub(sq.lastSchedulingAttempt) > delay } // AddApplication adds the application to the queue. All checks are assumed to have passed before we get here. @@ -1402,12 +1505,72 @@ func (sq *Queue) sortQueues() []*Queue { sortedMaxFairResources = append(sortedMaxFairResources, child.GetFairMaxResource()) } } - // Sort the queues + // Sort the queues using the configured policy sortQueue(sortedQueues, sortedMaxFairResources, sq.getSortType(), sq.IsPrioritySortEnabled()) + // Bounded-visit guarantee: hoist any child with pending demand that has not + // been visited within this queue's starvation delay to the front of the + // order. This preserves the relative order of both starved and non-starved + // children as produced by the policy sort above. See YUNIKORN-3243. + if delay := sq.GetStarvationDelay(); delay > 0 && len(sortedQueues) > 1 { + hoistStarvedQueues(sortedQueues, time.Now(), delay) + } + return sortedQueues } +// hoistStarvedQueues moves every child whose starvation clock has exceeded +// delay to the front of queues. Inside the starved group, queues are ordered +// by ascending lastSchedulingAttempt (oldest first) so the most-neglected +// queue wins the hoist. The relative order of the non-starved group is +// preserved. Lock free, invoked after the primary sort. See YUNIKORN-3243. +func hoistStarvedQueues(queues []*Queue, now time.Time, delay time.Duration) { + if delay <= 0 || len(queues) < 2 { + return + } + starvedIdx := make([]int, 0, len(queues)) + for i, q := range queues { + if q.isStarvedAt(now, delay) { + starvedIdx = append(starvedIdx, i) + } + } + if len(starvedIdx) == 0 { + return + } + starved := make([]*Queue, len(starvedIdx)) + for i, idx := range starvedIdx { + starved[i] = queues[idx] + } + // Oldest last-attempt first so queues that have been skipped the longest + // are served before queues that were visited more recently. Stable sort + // keeps deterministic ordering for equal timestamps. + sort.SliceStable(starved, func(i, j int) bool { + return starved[i].GetLastSchedulingAttempt().Before(starved[j].GetLastSchedulingAttempt()) + }) + nonStarved := make([]*Queue, 0, len(queues)-len(starved)) + starvedSet := make(map[int]struct{}, len(starvedIdx)) + for _, idx := range starvedIdx { + starvedSet[idx] = struct{}{} + } + for i, q := range queues { + if _, ok := starvedSet[i]; !ok { + nonStarved = append(nonStarved, q) + } + } + copy(queues, starved) + copy(queues[len(starved):], nonStarved) + if log.Log(log.SchedQueue).Core().Enabled(zap.DebugLevel) { + names := make([]string, 0, len(queues)) + for _, q := range queues { + names = append(names, q.QueuePath) + } + log.Log(log.SchedQueue).Debug("starvation hoist reordered sibling queues", + zap.Duration("delay", delay), + zap.Int("starvedCount", len(starved)), + zap.Strings("order", names)) + } +} + // getHeadRoom returns the headroom for the queue. This can never be more than the headroom for the parent. // In case there are no nodes in a newly started cluster and no queues have a limit configured this call // will return nil. @@ -1612,6 +1775,11 @@ func (sq *Queue) canRunApp(appID string) bool { // Applications are sorted based on the application sortPolicy. Applications without pending resources are skipped. // Lock free call this all locks are taken when needed in called functions func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() NodeIterator, getnode func(string) *Node, allowPreemption, quotaPreemption bool) *AllocationResult { + // Record the visit before any further work. Combined with the hoisting logic + // in the parent's sortQueues(), this guarantees that a queue with pending + // demand cannot be skipped indefinitely regardless of sibling DRF ratios. + // See YUNIKORN-3243. + sq.markSchedulingAttempted() if quotaPreemption && sq.shouldTriggerPreemption() { go func() { log.Log(log.SchedQueue).Info("Preconditions has passed trigger preemption to enforce new max resources", diff --git a/pkg/scheduler/objects/queue_starvation_hoist_test.go b/pkg/scheduler/objects/queue_starvation_hoist_test.go new file mode 100644 index 000000000..5d81bdf46 --- /dev/null +++ b/pkg/scheduler/objects/queue_starvation_hoist_test.go @@ -0,0 +1,338 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package objects + +import ( + "testing" + "time" + + "gotest.tools/v3/assert" + + "github.com/apache/yunikorn-core/pkg/common/configs" + "github.com/apache/yunikorn-core/pkg/common/resources" +) + +// These tests cover the bounded-visit guarantee introduced by YUNIKORN-3243. +// They exercise the per-queue starvation clock, the parent-level sortQueues +// hoisting, and the configuration surface that controls both. + +// TestStarvationDelayDefaultAndParsing verifies the default value and +// configuration parsing for the queue.starvation.delay property. The property +// must allow a zero value (feature disabled) unlike other delay-style +// properties that treat zero as invalid. +func TestStarvationDelayDefaultAndParsing(t *testing.T) { + root, err := createRootQueue(nil) + assert.NilError(t, err, "queue create failed") + assert.Equal(t, root.GetStarvationDelay(), configs.DefaultQueueStarvationDelay, + "default starvation delay should be used when no property is set") + + child, err := createManagedQueueWithProps(root, "delayed", true, nil, map[string]string{ + configs.QueueStarvationDelay: "5s", + }) + assert.NilError(t, err, "child create failed") + assert.Equal(t, child.GetStarvationDelay(), 5*time.Second, + "starvation delay property should be parsed") + + disabled, err := createManagedQueueWithProps(root, "disabled", true, nil, map[string]string{ + configs.QueueStarvationDelay: "0s", + }) + assert.NilError(t, err, "child create failed") + assert.Equal(t, disabled.GetStarvationDelay(), time.Duration(0), + "zero must be a valid value that disables hoisting") + + invalid, err := createManagedQueueWithProps(root, "invalid", true, nil, map[string]string{ + configs.QueueStarvationDelay: "not-a-duration", + }) + assert.NilError(t, err, "child create failed") + assert.Equal(t, invalid.GetStarvationDelay(), configs.DefaultQueueStarvationDelay, + "invalid duration should fall back to default") + + negative, err := createManagedQueueWithProps(root, "negative", true, nil, map[string]string{ + configs.QueueStarvationDelay: "-1s", + }) + assert.NilError(t, err, "child create failed") + assert.Equal(t, negative.GetStarvationDelay(), configs.DefaultQueueStarvationDelay, + "negative durations should fall back to default") +} + +// TestStarvationClockTracksPendingTransitions ensures the internal clock is +// started when demand arrives on an empty queue, reset when demand drains, and +// not double-started while demand stays positive. These transitions are the +// basis of the bounded-visit guarantee. +func TestStarvationClockTracksPendingTransitions(t *testing.T) { + root, err := createRootQueue(nil) + assert.NilError(t, err, "queue create failed") + parent, err := createManagedQueue(root, "parent", true, nil) + assert.NilError(t, err, "parent create failed") + leaf, err := createManagedQueue(parent, "leaf", false, nil) + assert.NilError(t, err, "leaf create failed") + + assert.Assert(t, leaf.GetLastSchedulingAttempt().IsZero(), + "clock must be zero before any demand arrives") + + res, err := resources.NewResourceFromConf(map[string]string{"first": "1"}) + assert.NilError(t, err, "failed to create basic resource") + + before := time.Now() + leaf.incPendingResource(res) + after := time.Now() + + started := leaf.GetLastSchedulingAttempt() + assert.Assert(t, !started.IsZero(), "clock must start when pending goes from zero to non-zero") + assert.Assert(t, !started.Before(before) && !started.After(after), + "clock must be set close to the moment demand arrived") + + // Adding more demand must not reset the clock; the grace window belongs to + // the first transition. + time.Sleep(2 * time.Millisecond) + leaf.incPendingResource(res) + assert.Equal(t, leaf.GetLastSchedulingAttempt(), started, + "clock must not be reset while demand remains positive") + + leaf.decPendingResource(res) + assert.Equal(t, leaf.GetLastSchedulingAttempt(), started, + "clock must stay while pending is still positive") + + leaf.decPendingResource(res) + assert.Assert(t, leaf.GetLastSchedulingAttempt().IsZero(), + "clock must reset to zero when pending drops to zero") + + // Second transition starts a new clock. + leaf.incPendingResource(res) + second := leaf.GetLastSchedulingAttempt() + assert.Assert(t, !second.IsZero(), "second transition must start the clock again") +} + +// TestIsStarvedAt covers the predicate used by the parent's sort. A queue is +// starving only when all of: (a) starvation hoisting is enabled, (b) the queue +// has pending demand, (c) the clock has been started, and (d) the elapsed time +// exceeds the configured delay are true. +func TestIsStarvedAt(t *testing.T) { + root, err := createRootQueue(nil) + assert.NilError(t, err, "queue create failed") + leaf, err := createManagedQueue(root, "leaf", false, nil) + assert.NilError(t, err, "leaf create failed") + + now := time.Now() + delay := 10 * time.Second + + assert.Assert(t, !leaf.isStarvedAt(now, 0), + "delay=0 must disable the check regardless of other state") + assert.Assert(t, !leaf.isStarvedAt(now, -time.Second), + "negative delay must be treated as disabled") + assert.Assert(t, !leaf.isStarvedAt(now, delay), + "a queue with zero pending must never be considered starving") + + res, err := resources.NewResourceFromConf(map[string]string{"first": "1"}) + assert.NilError(t, err, "failed to create basic resource") + leaf.incPendingResource(res) + + leaf.SetLastSchedulingAttempt(now.Add(-5 * time.Second)) + assert.Assert(t, !leaf.isStarvedAt(now, delay), + "a queue visited within the delay must not be flagged as starving") + + leaf.SetLastSchedulingAttempt(now.Add(-11 * time.Second)) + assert.Assert(t, leaf.isStarvedAt(now, delay), + "a queue skipped for longer than the delay must be flagged as starving") + + leaf.SetLastSchedulingAttempt(time.Time{}) + assert.Assert(t, !leaf.isStarvedAt(now, delay), + "zero clock means 'not tracked' and must not trigger hoisting") +} + +// TestHoistStarvedQueuesStablePartition locks in the ordering contract of +// hoistStarvedQueues: starved queues move to the front, but both the relative +// order of the starved and non-starved groups is preserved so that the DRF +// sort produced upstream still drives ties. +func TestHoistStarvedQueuesStablePartition(t *testing.T) { + now := time.Now() + delay := time.Second + + root, err := createRootQueue(nil) + assert.NilError(t, err, "queue create failed") + + mkQueue := func(name string, starved bool) *Queue { + q, qerr := createManagedQueue(root, name, false, nil) + assert.NilError(t, qerr, "queue create failed") + res, rerr := resources.NewResourceFromConf(map[string]string{"first": "1"}) + assert.NilError(t, rerr, "resource create failed") + q.incPendingResource(res) + if starved { + q.SetLastSchedulingAttempt(now.Add(-2 * delay)) + } else { + q.SetLastSchedulingAttempt(now) + } + return q + } + + // Build: [fresh, starved, fresh, starved, fresh]. Expected after the + // partition: [starved(1), starved(3), fresh(0), fresh(2), fresh(4)]. + a := mkQueue("a", false) + b := mkQueue("b", true) + c := mkQueue("c", false) + d := mkQueue("d", true) + e := mkQueue("e", false) + + ordered := []*Queue{a, b, c, d, e} + hoistStarvedQueues(ordered, now, delay) + + names := make([]string, len(ordered)) + for i, q := range ordered { + names[i] = q.Name + } + assert.DeepEqual(t, names, []string{"b", "d", "a", "c", "e"}) +} + +// TestHoistStarvedQueuesNoOpCases documents the early-return paths so that +// future refactors do not accidentally regress them. +func TestHoistStarvedQueuesNoOpCases(t *testing.T) { + now := time.Now() + + hoistStarvedQueues(nil, now, time.Second) // nil slice is safe + hoistStarvedQueues([]*Queue{}, now, time.Second) // empty slice is safe + + root, err := createRootQueue(nil) + assert.NilError(t, err, "queue create failed") + single, err := createManagedQueue(root, "single", false, nil) + assert.NilError(t, err, "queue create failed") + single.SetLastSchedulingAttempt(now.Add(-time.Hour)) + res, err := resources.NewResourceFromConf(map[string]string{"first": "1"}) + assert.NilError(t, err, "resource create failed") + single.incPendingResource(res) + + list := []*Queue{single} + hoistStarvedQueues(list, now, time.Second) + assert.Equal(t, list[0], single, "single-element slices are returned unchanged") + + // Disabled delay must be a no-op even if queues look starving. + other, err := createManagedQueue(root, "other", false, nil) + assert.NilError(t, err, "queue create failed") + other.incPendingResource(res) + other.SetLastSchedulingAttempt(now) + pair := []*Queue{single, other} + hoistStarvedQueues(pair, now, 0) + assert.Equal(t, pair[0], single, "delay=0 must not reorder queues") + assert.Equal(t, pair[1], other, "delay=0 must not reorder queues") +} + +// TestSortQueuesHoistsStarvedSibling is the end-to-end proof for the bug in +// YUNIKORN-3243. The DRF sort would normally place the larger queue first +// because its allocated/guaranteed ratio is lower, but once the smaller queue +// has been skipped past the starvation delay the parent's sort must pull it +// back to the front so that the next TryAllocate cycle visits it. +func TestSortQueuesHoistsStarvedSibling(t *testing.T) { + // Asymmetric guarantees reproduce the production 3600:1 ratio so the + // small queue's fair-share ratio stays dominant for thousands of cycles. + root, err := createRootQueueWithProps(map[string]string{ + configs.QueueStarvationDelay: "100ms", + }) + assert.NilError(t, err, "root create failed") + + large, err := createManagedQueueGuaranteed(root, "large", false, + map[string]string{"memory": "400G"}, + map[string]string{"memory": "360G"}, + nil) + assert.NilError(t, err, "large create failed") + small, err := createManagedQueueGuaranteed(root, "small", false, + map[string]string{"memory": "100M"}, + map[string]string{"memory": "100M"}, + nil) + assert.NilError(t, err, "small create failed") + + // Give small a prior allocation so small's ratio (0.1) dominates large's (0). + priorAlloc, err := resources.NewResourceFromConf(map[string]string{"memory": "10M"}) + assert.NilError(t, err, "resource create failed") + small.IncAllocatedResource(priorAlloc) + + largeDemand, err := resources.NewResourceFromConf(map[string]string{"memory": "1G"}) + assert.NilError(t, err, "resource create failed") + smallDemand, err := resources.NewResourceFromConf(map[string]string{"memory": "1M"}) + assert.NilError(t, err, "resource create failed") + large.incPendingResource(largeDemand) + small.incPendingResource(smallDemand) + + // Baseline: both queues have freshly-started clocks, so the normal DRF + // sort wins and large is visited first. + fresh := time.Now() + large.SetLastSchedulingAttempt(fresh) + small.SetLastSchedulingAttempt(fresh) + sorted := root.sortQueues() + assert.Equal(t, len(sorted), 2, "both children should be eligible for scheduling") + assert.Equal(t, sorted[0].Name, "large", + "without hoisting the large queue must sort first under DRF") + + // Simulate small being skipped past the starvation threshold. On the next + // sort it must be hoisted in front of large even though its DRF ratio is + // still higher. + small.SetLastSchedulingAttempt(time.Now().Add(-time.Second)) + sorted = root.sortQueues() + assert.Equal(t, len(sorted), 2) + assert.Equal(t, sorted[0].Name, "small", + "starved child must be hoisted to the front regardless of DRF ratio") + assert.Equal(t, sorted[1].Name, "large", + "non-starved sibling must keep its original relative position") +} + +// TestSortQueuesDisabledDelayKeepsDRF guards against accidentally enabling the +// hoist when an operator has turned it off. With delay=0 the DRF sort must be +// returned verbatim even for a queue that has been skipped for a long time. +func TestSortQueuesDisabledDelayKeepsDRF(t *testing.T) { + root, err := createRootQueueWithProps(map[string]string{ + configs.QueueStarvationDelay: "0s", + }) + assert.NilError(t, err, "root create failed") + + large, err := createManagedQueueGuaranteed(root, "large", false, + map[string]string{"memory": "400G"}, + map[string]string{"memory": "360G"}, + nil) + assert.NilError(t, err, "large create failed") + small, err := createManagedQueueGuaranteed(root, "small", false, + map[string]string{"memory": "100M"}, + map[string]string{"memory": "100M"}, + nil) + assert.NilError(t, err, "small create failed") + + priorAlloc, err := resources.NewResourceFromConf(map[string]string{"memory": "10M"}) + assert.NilError(t, err, "resource create failed") + small.IncAllocatedResource(priorAlloc) + + largeDemand, err := resources.NewResourceFromConf(map[string]string{"memory": "1G"}) + assert.NilError(t, err, "resource create failed") + smallDemand, err := resources.NewResourceFromConf(map[string]string{"memory": "1M"}) + assert.NilError(t, err, "resource create failed") + large.incPendingResource(largeDemand) + small.incPendingResource(smallDemand) + + // Small has been "starving" for an hour, but the feature is disabled. + small.SetLastSchedulingAttempt(time.Now().Add(-time.Hour)) + large.SetLastSchedulingAttempt(time.Now()) + + sorted := root.sortQueues() + assert.Equal(t, len(sorted), 2) + assert.Equal(t, sorted[0].Name, "large", + "disabled hoisting must leave the DRF order untouched") +} + +// createRootQueueWithProps is a local helper until the shared utility in +// utilities_test.go grows a props-taking root helper. Kept in this file so it +// does not conflict with existing helpers. +func createRootQueueWithProps(props map[string]string) (*Queue, error) { + return createManagedQueueWithProps(nil, "root", true, nil, props) +} diff --git a/pkg/scheduler/tests/queue_starvation_test.go b/pkg/scheduler/tests/queue_starvation_test.go index c60483847..c8f6a813a 100644 --- a/pkg/scheduler/tests/queue_starvation_test.go +++ b/pkg/scheduler/tests/queue_starvation_test.go @@ -21,6 +21,7 @@ package tests import ( "fmt" "testing" + "time" "gotest.tools/v3/assert" @@ -1844,3 +1845,228 @@ partitions: "CONTRAST: Without priority.offset, sparkpi is starved due to fair-share ratio. "+ "This proves priority.offset is the specific config change that eliminates starvation.") } + +// TestStarvationDelayHoistsSkippedSibling is the end-to-end validation of the +// YUNIKORN-3243 fix. With queue.starvation.delay configured on the parent and +// the clock artificially advanced past that delay, the starved sibling must be +// hoisted to the front of the next scheduling cycle and served — and with it +// the schedulingAttempted flag on its asks must flip to true so the +// autoscaler signal path (inspectOutstandingRequests) is no longer blind. +// +// The clock is advanced via Queue.SetLastSchedulingAttempt rather than with a +// wall-clock sleep so the test stays fast and deterministic. +func TestStarvationDelayHoistsSkippedSibling(t *testing.T) { + // Same 3600:1 asymmetric-guarantees setup as TestQueueStarvationWithPriorAllocation + // but with queue.starvation.delay configured on root. + configData := ` +partitions: + - name: default + queues: + - name: root + submitacl: "*" + properties: + queue.starvation.delay: 100ms + queues: + - name: large + resources: + guaranteed: + memory: 360G + max: + memory: 400G + - name: small + resources: + guaranteed: + memory: 100M + max: + memory: 100M +` + ms := &mockScheduler{} + defer ms.Stop() + + err := ms.Init(configData, false, false) + assert.NilError(t, err, "RegisterResourceManager failed") + + err = ms.proxy.UpdateNode(&si.NodeRequest{ + Nodes: []*si.NodeInfo{ + { + NodeID: "node-1:1234", + Attributes: map[string]string{}, + SchedulableResource: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 500000000000}, + }, + }, + Action: si.NodeInfo_CREATE, + }, + }, + RmID: "rm:123", + }) + assert.NilError(t, err, "NodeRequest failed") + ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000) + + // Seed small with a prior allocation so its DRF ratio (0.1) dominates + // large's (0.0), which is what causes the starvation in the first place. + err = ms.addApp("prior-app", "root.small", "default") + assert.NilError(t, err) + ms.mockRM.waitForAcceptedApplication(t, "prior-app", 1000) + err = ms.addAppRequest("prior-app", "prior", &si.Resource{ + Resources: map[string]*si.Quantity{"memory": {Value: 10000000}}, + }, 1) + assert.NilError(t, err) + + smallQueue := ms.getQueue("root.small") + largeQueue := ms.getQueue("root.large") + waitForPendingQueueResource(t, smallQueue, 10000000, 1000) + ms.scheduler.MultiStepSchedule(3) + ms.mockRM.waitForAllocations(t, 1, 1000) + + // Add the noisy neighbour on the large queue plus the small sparkpi ask + // that would normally be starved indefinitely. + err = ms.addApp("large-app", "root.large", "default") + assert.NilError(t, err) + ms.mockRM.waitForAcceptedApplication(t, "large-app", 1000) + err = ms.addAppRequest("large-app", "large", &si.Resource{ + Resources: map[string]*si.Quantity{"memory": {Value: 1000000000}}, + }, 20) + assert.NilError(t, err) + + err = ms.addApp("sparkpi-app", "root.small", "default") + assert.NilError(t, err) + ms.mockRM.waitForAcceptedApplication(t, "sparkpi-app", 1000) + err = ms.addAppRequest("sparkpi-app", "sparkpi", &si.Resource{ + Resources: map[string]*si.Quantity{"memory": {Value: 1000000}}, + }, 1) + assert.NilError(t, err) + + waitForPendingQueueResource(t, largeQueue, 20*1000000000, 1000) + waitForPendingQueueResource(t, smallQueue, 1000000, 1000) + + // A single quick cycle still starves small: its clock has only just + // started (demand arrived moments ago) so it is inside the grace window + // and the DRF sort wins normally. + ms.scheduler.MultiStepSchedule(1) + sparkpiApp := ms.getApplication("sparkpi-app") + assert.Equal(t, + int(sparkpiApp.GetAllocatedResource().Resources[siCommon.Memory]), 0, + "baseline: within the grace window, DRF sort is unchanged and small is still starved") + sparkpiAsk := sparkpiApp.GetAllocationAsk("sparkpi-0") + assert.Assert(t, sparkpiAsk != nil, "sparkpi ask should exist") + assert.Check(t, !sparkpiAsk.IsSchedulingAttempted(), + "baseline: schedulingAttempted should still be false inside the grace window") + + // Simulate small being skipped past the starvation delay without waiting + // for wall-clock time. The parent's next sortQueues() must hoist small + // because its lastSchedulingAttempt is now older than any other sibling's. + smallQueue.SetLastSchedulingAttempt(time.Now().Add(-time.Second)) + + // A single cycle is enough: the hoist runs inside sortQueues and small is + // visited before large. + ms.scheduler.MultiStepSchedule(1) + + sparkpiMem := int(sparkpiApp.GetAllocatedResource().Resources[siCommon.Memory]) + assert.Equal(t, sparkpiMem, 1000000, + "FIX CONFIRMED: once small has been skipped past queue.starvation.delay, "+ + "the parent's sort hoists it to the front and the ask is served on "+ + "the very next cycle even though its DRF ratio is still dominant.") + + assert.Check(t, sparkpiAsk.IsSchedulingAttempted(), + "AUTOSCALER SIGNAL RESTORED: visiting the hoisted queue runs "+ + "app.tryAllocate() which flips schedulingAttempted to true, so "+ + "inspectOutstandingRequests will now include this ask.") + + t.Logf("queue.starvation.delay eliminates starvation: sparkpi served after hoist, "+ + "schedulingAttempted=%v", sparkpiAsk.IsSchedulingAttempted()) +} + +// TestStarvationDelayDisabledPreservesDRFBehaviour pins the behavioural +// contract when the hoisting feature is explicitly turned off on the parent: +// the scheduler must keep the pre-existing DRF-only sort, reproducing the +// original starvation bug. This is important so operators who rely on the +// legacy behaviour can opt out cleanly. +func TestStarvationDelayDisabledPreservesDRFBehaviour(t *testing.T) { + configData := ` +partitions: + - name: default + queues: + - name: root + submitacl: "*" + properties: + queue.starvation.delay: "0s" + queues: + - name: large + resources: + guaranteed: + memory: 360G + max: + memory: 400G + - name: small + resources: + guaranteed: + memory: 100M + max: + memory: 100M +` + ms := &mockScheduler{} + defer ms.Stop() + + err := ms.Init(configData, false, false) + assert.NilError(t, err, "RegisterResourceManager failed") + + err = ms.proxy.UpdateNode(&si.NodeRequest{ + Nodes: []*si.NodeInfo{ + { + NodeID: "node-1:1234", + Attributes: map[string]string{}, + SchedulableResource: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 500000000000}, + }, + }, + Action: si.NodeInfo_CREATE, + }, + }, + RmID: "rm:123", + }) + assert.NilError(t, err, "NodeRequest failed") + ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000) + + err = ms.addApp("prior-app", "root.small", "default") + assert.NilError(t, err) + ms.mockRM.waitForAcceptedApplication(t, "prior-app", 1000) + err = ms.addAppRequest("prior-app", "prior", &si.Resource{ + Resources: map[string]*si.Quantity{"memory": {Value: 10000000}}, + }, 1) + assert.NilError(t, err) + smallQueue := ms.getQueue("root.small") + waitForPendingQueueResource(t, smallQueue, 10000000, 1000) + ms.scheduler.MultiStepSchedule(3) + ms.mockRM.waitForAllocations(t, 1, 1000) + + err = ms.addApp("large-app", "root.large", "default") + assert.NilError(t, err) + ms.mockRM.waitForAcceptedApplication(t, "large-app", 1000) + err = ms.addAppRequest("large-app", "large", &si.Resource{ + Resources: map[string]*si.Quantity{"memory": {Value: 1000000000}}, + }, 20) + assert.NilError(t, err) + err = ms.addApp("sparkpi-app", "root.small", "default") + assert.NilError(t, err) + ms.mockRM.waitForAcceptedApplication(t, "sparkpi-app", 1000) + err = ms.addAppRequest("sparkpi-app", "sparkpi", &si.Resource{ + Resources: map[string]*si.Quantity{"memory": {Value: 1000000}}, + }, 1) + assert.NilError(t, err) + waitForPendingQueueResource(t, ms.getQueue("root.large"), 20*1000000000, 1000) + waitForPendingQueueResource(t, smallQueue, 1000000, 1000) + + // Even after artificially advancing the clock far past any sensible delay, + // hoisting must stay disabled because the parent is configured with 0. + smallQueue.SetLastSchedulingAttempt(time.Now().Add(-time.Hour)) + ms.scheduler.MultiStepSchedule(20) + + sparkpiApp := ms.getApplication("sparkpi-app") + sparkpiMem := int(sparkpiApp.GetAllocatedResource().Resources[siCommon.Memory]) + assert.Equal(t, sparkpiMem, 0, + "With queue.starvation.delay=0, the legacy DRF-only behaviour must be "+ + "preserved: small remains starved for the duration of the test.") +}