diff --git a/pkg/common/configs/configvalidator.go b/pkg/common/configs/configvalidator.go index 3117368fe..837fa133b 100644 --- a/pkg/common/configs/configvalidator.go +++ b/pkg/common/configs/configvalidator.go @@ -51,6 +51,7 @@ const ( PreemptionPolicy = "preemption.policy" PreemptionDelay = "preemption.delay" QuotaPreemptionDelay = "quota.preemption.delay" + QueueStarvationDelay = "queue.starvation.delay" // app sort priority values ApplicationSortPriorityEnabled = "enabled" @@ -75,6 +76,12 @@ var DefaultQuotaPreemptionDelay time.Duration = 0 var DefaultPreemptionDelay = 30 * time.Second var DefaultAskBackOffDelay = 30 * time.Second +// DefaultQueueStarvationDelay is the default value for the queue.starvation.delay property. +// When a child queue has pending demand but the parent's sort has skipped it for longer than +// this duration, the parent's sort will hoist the starving child to the front of the order. +// A value of 0 disables the hoisting behaviour on that parent queue. See YUNIKORN-3243. +var DefaultQueueStarvationDelay = 15 * time.Second + // QueueNameRegExp to validate the name of a queue. // A queue can be a username with the dot replaced. Most systems allow a 32 character username. // The queue name must thus allow for at least that length with the replacement of dots. diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go index db7a1f18b..5e2cb808f 100644 --- a/pkg/scheduler/objects/queue.go +++ b/pkg/scheduler/objects/queue.go @@ -22,6 +22,7 @@ import ( "context" "errors" "fmt" + "sort" "strconv" "strings" "time" @@ -94,6 +95,15 @@ type Queue struct { isQuotaPreemptionRunning bool unschedAskBackoff uint64 askBackoffDelay time.Duration + // starvationDelay controls how long a child queue can remain unvisited by this + // queue's TryAllocate sort before it is hoisted to the front of the sibling order. + // 0 disables the hoisting behaviour on this queue. See YUNIKORN-3243. + starvationDelay time.Duration + // lastSchedulingAttempt is the last wall-clock time TryAllocate descended into + // this queue. Used by the parent's sort to detect queues that have been skipped + // for too long despite having pending demand. Zero value means "not tracked" + // (i.e. this queue currently has no pending demand). See YUNIKORN-3243. + lastSchedulingAttempt time.Time locking.RWMutex } @@ -119,6 +129,8 @@ func newBlankQueue() *Queue { quotaPreemptionDelay: 0, quotaPreemptionStartTime: time.Time{}, askBackoffDelay: configs.DefaultAskBackOffDelay, + starvationDelay: configs.DefaultQueueStarvationDelay, + lastSchedulingAttempt: time.Time{}, } } @@ -283,6 +295,20 @@ func convertDelay(value string, def time.Duration) (time.Duration, error) { return result, nil } +// convertStarvationDelay parses a duration string for queue.starvation.delay. +// Unlike convertDelay, a value of zero is allowed and means "feature disabled on this queue". +// Negative values fall back to the default. See YUNIKORN-3243. +func convertStarvationDelay(value string) (time.Duration, error) { + result, err := time.ParseDuration(value) + if err != nil { + return configs.DefaultQueueStarvationDelay, err + } + if int64(result) < int64(0) { + return configs.DefaultQueueStarvationDelay, fmt.Errorf("starvation delay must be non-negative: %s", value) + } + return result, nil +} + func priorityOffset(value string) (int32, error) { intValue, err := strconv.ParseInt(value, 10, 32) if err != nil { @@ -722,6 +748,12 @@ func (sq *Queue) UpdateQueueProperties(oldMaxResource *resources.Resource) { zap.Error(err)) } sq.setPreemptionTime(oldMaxResource, oldDelay) + case configs.QueueStarvationDelay: + sq.starvationDelay, err = convertStarvationDelay(value) + if err != nil { + log.Log(log.SchedQueue).Debug("queue starvation delay configuration error", + zap.Error(err)) + } default: // skip unknown properties just log them log.Log(log.SchedQueue).Debug("queue property skipped", @@ -926,7 +958,14 @@ func (sq *Queue) incPendingResource(delta *resources.Resource) { // update this queue sq.Lock() defer sq.Unlock() + wasZero := !resources.StrictlyGreaterThanZero(sq.pending) sq.pending = resources.Add(sq.pending, delta) + // Start the starvation clock when pending transitions from zero to non-zero. + // This gives a newly-pending queue a full grace window before it can be hoisted. + // See YUNIKORN-3243. + if wasZero && resources.StrictlyGreaterThanZero(sq.pending) && sq.lastSchedulingAttempt.IsZero() { + sq.lastSchedulingAttempt = time.Now() + } sq.updatePendingResourceMetrics() } @@ -955,7 +994,71 @@ func (sq *Queue) decPendingResource(delta *resources.Resource) { // If we prune the resource first and the resource become nil after pruning, // the metrics will not be updated with nil resource, this is not expected. sq.pending.Prune() + // Pending dropped to zero: stop tracking starvation until new demand arrives. + // See YUNIKORN-3243. + if !resources.StrictlyGreaterThanZero(sq.pending) { + sq.lastSchedulingAttempt = time.Time{} + } + } +} + +// GetStarvationDelay returns the configured starvation delay for this queue. +// When > 0, the queue's sortQueues() will hoist any child with pending demand +// that has not been visited within this duration to the front of the sort order. +// See YUNIKORN-3243. +func (sq *Queue) GetStarvationDelay() time.Duration { + sq.RLock() + defer sq.RUnlock() + return sq.starvationDelay +} + +// GetLastSchedulingAttempt returns the wall-clock time at which TryAllocate last +// descended into this queue. A zero value indicates the queue has no pending +// demand and is not currently being tracked. See YUNIKORN-3243. +func (sq *Queue) GetLastSchedulingAttempt() time.Time { + sq.RLock() + defer sq.RUnlock() + return sq.lastSchedulingAttempt +} + +// SetLastSchedulingAttempt overrides the last-scheduling-attempt timestamp. +// This is only intended for tests that need to simulate a queue that has been +// skipped for an extended period. +func (sq *Queue) SetLastSchedulingAttempt(t time.Time) { + sq.Lock() + defer sq.Unlock() + sq.lastSchedulingAttempt = t +} + +// markSchedulingAttempted records that TryAllocate has descended into this queue. +// Called at the entry of TryAllocate so that siblings whose TryAllocate was never +// reached (because an earlier sibling's allocation short-circuited the loop) keep +// their older timestamp and can be hoisted by the parent's next sort. +// See YUNIKORN-3243. +func (sq *Queue) markSchedulingAttempted() { + sq.Lock() + defer sq.Unlock() + sq.lastSchedulingAttempt = time.Now() +} + +// isStarvedAt reports whether this queue has pending demand and has not been +// visited by TryAllocate within the given delay, evaluated as of now. +// A zero or negative delay disables the check. A queue that has never had +// its clock started (zero lastSchedulingAttempt) is not considered starving. +// See YUNIKORN-3243. +func (sq *Queue) isStarvedAt(now time.Time, delay time.Duration) bool { + if delay <= 0 { + return false } + sq.RLock() + defer sq.RUnlock() + if sq.lastSchedulingAttempt.IsZero() { + return false + } + if !resources.StrictlyGreaterThanZero(sq.pending) { + return false + } + return now.Sub(sq.lastSchedulingAttempt) > delay } // AddApplication adds the application to the queue. All checks are assumed to have passed before we get here. @@ -1402,12 +1505,72 @@ func (sq *Queue) sortQueues() []*Queue { sortedMaxFairResources = append(sortedMaxFairResources, child.GetFairMaxResource()) } } - // Sort the queues + // Sort the queues using the configured policy sortQueue(sortedQueues, sortedMaxFairResources, sq.getSortType(), sq.IsPrioritySortEnabled()) + // Bounded-visit guarantee: hoist any child with pending demand that has not + // been visited within this queue's starvation delay to the front of the + // order. This preserves the relative order of both starved and non-starved + // children as produced by the policy sort above. See YUNIKORN-3243. + if delay := sq.GetStarvationDelay(); delay > 0 && len(sortedQueues) > 1 { + hoistStarvedQueues(sortedQueues, time.Now(), delay) + } + return sortedQueues } +// hoistStarvedQueues moves every child whose starvation clock has exceeded +// delay to the front of queues. Inside the starved group, queues are ordered +// by ascending lastSchedulingAttempt (oldest first) so the most-neglected +// queue wins the hoist. The relative order of the non-starved group is +// preserved. Lock free, invoked after the primary sort. See YUNIKORN-3243. +func hoistStarvedQueues(queues []*Queue, now time.Time, delay time.Duration) { + if delay <= 0 || len(queues) < 2 { + return + } + starvedIdx := make([]int, 0, len(queues)) + for i, q := range queues { + if q.isStarvedAt(now, delay) { + starvedIdx = append(starvedIdx, i) + } + } + if len(starvedIdx) == 0 { + return + } + starved := make([]*Queue, len(starvedIdx)) + for i, idx := range starvedIdx { + starved[i] = queues[idx] + } + // Oldest last-attempt first so queues that have been skipped the longest + // are served before queues that were visited more recently. Stable sort + // keeps deterministic ordering for equal timestamps. + sort.SliceStable(starved, func(i, j int) bool { + return starved[i].GetLastSchedulingAttempt().Before(starved[j].GetLastSchedulingAttempt()) + }) + nonStarved := make([]*Queue, 0, len(queues)-len(starved)) + starvedSet := make(map[int]struct{}, len(starvedIdx)) + for _, idx := range starvedIdx { + starvedSet[idx] = struct{}{} + } + for i, q := range queues { + if _, ok := starvedSet[i]; !ok { + nonStarved = append(nonStarved, q) + } + } + copy(queues, starved) + copy(queues[len(starved):], nonStarved) + if log.Log(log.SchedQueue).Core().Enabled(zap.DebugLevel) { + names := make([]string, 0, len(queues)) + for _, q := range queues { + names = append(names, q.QueuePath) + } + log.Log(log.SchedQueue).Debug("starvation hoist reordered sibling queues", + zap.Duration("delay", delay), + zap.Int("starvedCount", len(starved)), + zap.Strings("order", names)) + } +} + // getHeadRoom returns the headroom for the queue. This can never be more than the headroom for the parent. // In case there are no nodes in a newly started cluster and no queues have a limit configured this call // will return nil. @@ -1612,6 +1775,11 @@ func (sq *Queue) canRunApp(appID string) bool { // Applications are sorted based on the application sortPolicy. Applications without pending resources are skipped. // Lock free call this all locks are taken when needed in called functions func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() NodeIterator, getnode func(string) *Node, allowPreemption, quotaPreemption bool) *AllocationResult { + // Record the visit before any further work. Combined with the hoisting logic + // in the parent's sortQueues(), this guarantees that a queue with pending + // demand cannot be skipped indefinitely regardless of sibling DRF ratios. + // See YUNIKORN-3243. + sq.markSchedulingAttempted() if quotaPreemption && sq.shouldTriggerPreemption() { go func() { log.Log(log.SchedQueue).Info("Preconditions has passed trigger preemption to enforce new max resources", diff --git a/pkg/scheduler/objects/queue_starvation_hoist_test.go b/pkg/scheduler/objects/queue_starvation_hoist_test.go new file mode 100644 index 000000000..5d81bdf46 --- /dev/null +++ b/pkg/scheduler/objects/queue_starvation_hoist_test.go @@ -0,0 +1,338 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package objects + +import ( + "testing" + "time" + + "gotest.tools/v3/assert" + + "github.com/apache/yunikorn-core/pkg/common/configs" + "github.com/apache/yunikorn-core/pkg/common/resources" +) + +// These tests cover the bounded-visit guarantee introduced by YUNIKORN-3243. +// They exercise the per-queue starvation clock, the parent-level sortQueues +// hoisting, and the configuration surface that controls both. + +// TestStarvationDelayDefaultAndParsing verifies the default value and +// configuration parsing for the queue.starvation.delay property. The property +// must allow a zero value (feature disabled) unlike other delay-style +// properties that treat zero as invalid. +func TestStarvationDelayDefaultAndParsing(t *testing.T) { + root, err := createRootQueue(nil) + assert.NilError(t, err, "queue create failed") + assert.Equal(t, root.GetStarvationDelay(), configs.DefaultQueueStarvationDelay, + "default starvation delay should be used when no property is set") + + child, err := createManagedQueueWithProps(root, "delayed", true, nil, map[string]string{ + configs.QueueStarvationDelay: "5s", + }) + assert.NilError(t, err, "child create failed") + assert.Equal(t, child.GetStarvationDelay(), 5*time.Second, + "starvation delay property should be parsed") + + disabled, err := createManagedQueueWithProps(root, "disabled", true, nil, map[string]string{ + configs.QueueStarvationDelay: "0s", + }) + assert.NilError(t, err, "child create failed") + assert.Equal(t, disabled.GetStarvationDelay(), time.Duration(0), + "zero must be a valid value that disables hoisting") + + invalid, err := createManagedQueueWithProps(root, "invalid", true, nil, map[string]string{ + configs.QueueStarvationDelay: "not-a-duration", + }) + assert.NilError(t, err, "child create failed") + assert.Equal(t, invalid.GetStarvationDelay(), configs.DefaultQueueStarvationDelay, + "invalid duration should fall back to default") + + negative, err := createManagedQueueWithProps(root, "negative", true, nil, map[string]string{ + configs.QueueStarvationDelay: "-1s", + }) + assert.NilError(t, err, "child create failed") + assert.Equal(t, negative.GetStarvationDelay(), configs.DefaultQueueStarvationDelay, + "negative durations should fall back to default") +} + +// TestStarvationClockTracksPendingTransitions ensures the internal clock is +// started when demand arrives on an empty queue, reset when demand drains, and +// not double-started while demand stays positive. These transitions are the +// basis of the bounded-visit guarantee. +func TestStarvationClockTracksPendingTransitions(t *testing.T) { + root, err := createRootQueue(nil) + assert.NilError(t, err, "queue create failed") + parent, err := createManagedQueue(root, "parent", true, nil) + assert.NilError(t, err, "parent create failed") + leaf, err := createManagedQueue(parent, "leaf", false, nil) + assert.NilError(t, err, "leaf create failed") + + assert.Assert(t, leaf.GetLastSchedulingAttempt().IsZero(), + "clock must be zero before any demand arrives") + + res, err := resources.NewResourceFromConf(map[string]string{"first": "1"}) + assert.NilError(t, err, "failed to create basic resource") + + before := time.Now() + leaf.incPendingResource(res) + after := time.Now() + + started := leaf.GetLastSchedulingAttempt() + assert.Assert(t, !started.IsZero(), "clock must start when pending goes from zero to non-zero") + assert.Assert(t, !started.Before(before) && !started.After(after), + "clock must be set close to the moment demand arrived") + + // Adding more demand must not reset the clock; the grace window belongs to + // the first transition. + time.Sleep(2 * time.Millisecond) + leaf.incPendingResource(res) + assert.Equal(t, leaf.GetLastSchedulingAttempt(), started, + "clock must not be reset while demand remains positive") + + leaf.decPendingResource(res) + assert.Equal(t, leaf.GetLastSchedulingAttempt(), started, + "clock must stay while pending is still positive") + + leaf.decPendingResource(res) + assert.Assert(t, leaf.GetLastSchedulingAttempt().IsZero(), + "clock must reset to zero when pending drops to zero") + + // Second transition starts a new clock. + leaf.incPendingResource(res) + second := leaf.GetLastSchedulingAttempt() + assert.Assert(t, !second.IsZero(), "second transition must start the clock again") +} + +// TestIsStarvedAt covers the predicate used by the parent's sort. A queue is +// starving only when all of: (a) starvation hoisting is enabled, (b) the queue +// has pending demand, (c) the clock has been started, and (d) the elapsed time +// exceeds the configured delay are true. +func TestIsStarvedAt(t *testing.T) { + root, err := createRootQueue(nil) + assert.NilError(t, err, "queue create failed") + leaf, err := createManagedQueue(root, "leaf", false, nil) + assert.NilError(t, err, "leaf create failed") + + now := time.Now() + delay := 10 * time.Second + + assert.Assert(t, !leaf.isStarvedAt(now, 0), + "delay=0 must disable the check regardless of other state") + assert.Assert(t, !leaf.isStarvedAt(now, -time.Second), + "negative delay must be treated as disabled") + assert.Assert(t, !leaf.isStarvedAt(now, delay), + "a queue with zero pending must never be considered starving") + + res, err := resources.NewResourceFromConf(map[string]string{"first": "1"}) + assert.NilError(t, err, "failed to create basic resource") + leaf.incPendingResource(res) + + leaf.SetLastSchedulingAttempt(now.Add(-5 * time.Second)) + assert.Assert(t, !leaf.isStarvedAt(now, delay), + "a queue visited within the delay must not be flagged as starving") + + leaf.SetLastSchedulingAttempt(now.Add(-11 * time.Second)) + assert.Assert(t, leaf.isStarvedAt(now, delay), + "a queue skipped for longer than the delay must be flagged as starving") + + leaf.SetLastSchedulingAttempt(time.Time{}) + assert.Assert(t, !leaf.isStarvedAt(now, delay), + "zero clock means 'not tracked' and must not trigger hoisting") +} + +// TestHoistStarvedQueuesStablePartition locks in the ordering contract of +// hoistStarvedQueues: starved queues move to the front, but both the relative +// order of the starved and non-starved groups is preserved so that the DRF +// sort produced upstream still drives ties. +func TestHoistStarvedQueuesStablePartition(t *testing.T) { + now := time.Now() + delay := time.Second + + root, err := createRootQueue(nil) + assert.NilError(t, err, "queue create failed") + + mkQueue := func(name string, starved bool) *Queue { + q, qerr := createManagedQueue(root, name, false, nil) + assert.NilError(t, qerr, "queue create failed") + res, rerr := resources.NewResourceFromConf(map[string]string{"first": "1"}) + assert.NilError(t, rerr, "resource create failed") + q.incPendingResource(res) + if starved { + q.SetLastSchedulingAttempt(now.Add(-2 * delay)) + } else { + q.SetLastSchedulingAttempt(now) + } + return q + } + + // Build: [fresh, starved, fresh, starved, fresh]. Expected after the + // partition: [starved(1), starved(3), fresh(0), fresh(2), fresh(4)]. + a := mkQueue("a", false) + b := mkQueue("b", true) + c := mkQueue("c", false) + d := mkQueue("d", true) + e := mkQueue("e", false) + + ordered := []*Queue{a, b, c, d, e} + hoistStarvedQueues(ordered, now, delay) + + names := make([]string, len(ordered)) + for i, q := range ordered { + names[i] = q.Name + } + assert.DeepEqual(t, names, []string{"b", "d", "a", "c", "e"}) +} + +// TestHoistStarvedQueuesNoOpCases documents the early-return paths so that +// future refactors do not accidentally regress them. +func TestHoistStarvedQueuesNoOpCases(t *testing.T) { + now := time.Now() + + hoistStarvedQueues(nil, now, time.Second) // nil slice is safe + hoistStarvedQueues([]*Queue{}, now, time.Second) // empty slice is safe + + root, err := createRootQueue(nil) + assert.NilError(t, err, "queue create failed") + single, err := createManagedQueue(root, "single", false, nil) + assert.NilError(t, err, "queue create failed") + single.SetLastSchedulingAttempt(now.Add(-time.Hour)) + res, err := resources.NewResourceFromConf(map[string]string{"first": "1"}) + assert.NilError(t, err, "resource create failed") + single.incPendingResource(res) + + list := []*Queue{single} + hoistStarvedQueues(list, now, time.Second) + assert.Equal(t, list[0], single, "single-element slices are returned unchanged") + + // Disabled delay must be a no-op even if queues look starving. + other, err := createManagedQueue(root, "other", false, nil) + assert.NilError(t, err, "queue create failed") + other.incPendingResource(res) + other.SetLastSchedulingAttempt(now) + pair := []*Queue{single, other} + hoistStarvedQueues(pair, now, 0) + assert.Equal(t, pair[0], single, "delay=0 must not reorder queues") + assert.Equal(t, pair[1], other, "delay=0 must not reorder queues") +} + +// TestSortQueuesHoistsStarvedSibling is the end-to-end proof for the bug in +// YUNIKORN-3243. The DRF sort would normally place the larger queue first +// because its allocated/guaranteed ratio is lower, but once the smaller queue +// has been skipped past the starvation delay the parent's sort must pull it +// back to the front so that the next TryAllocate cycle visits it. +func TestSortQueuesHoistsStarvedSibling(t *testing.T) { + // Asymmetric guarantees reproduce the production 3600:1 ratio so the + // small queue's fair-share ratio stays dominant for thousands of cycles. + root, err := createRootQueueWithProps(map[string]string{ + configs.QueueStarvationDelay: "100ms", + }) + assert.NilError(t, err, "root create failed") + + large, err := createManagedQueueGuaranteed(root, "large", false, + map[string]string{"memory": "400G"}, + map[string]string{"memory": "360G"}, + nil) + assert.NilError(t, err, "large create failed") + small, err := createManagedQueueGuaranteed(root, "small", false, + map[string]string{"memory": "100M"}, + map[string]string{"memory": "100M"}, + nil) + assert.NilError(t, err, "small create failed") + + // Give small a prior allocation so small's ratio (0.1) dominates large's (0). + priorAlloc, err := resources.NewResourceFromConf(map[string]string{"memory": "10M"}) + assert.NilError(t, err, "resource create failed") + small.IncAllocatedResource(priorAlloc) + + largeDemand, err := resources.NewResourceFromConf(map[string]string{"memory": "1G"}) + assert.NilError(t, err, "resource create failed") + smallDemand, err := resources.NewResourceFromConf(map[string]string{"memory": "1M"}) + assert.NilError(t, err, "resource create failed") + large.incPendingResource(largeDemand) + small.incPendingResource(smallDemand) + + // Baseline: both queues have freshly-started clocks, so the normal DRF + // sort wins and large is visited first. + fresh := time.Now() + large.SetLastSchedulingAttempt(fresh) + small.SetLastSchedulingAttempt(fresh) + sorted := root.sortQueues() + assert.Equal(t, len(sorted), 2, "both children should be eligible for scheduling") + assert.Equal(t, sorted[0].Name, "large", + "without hoisting the large queue must sort first under DRF") + + // Simulate small being skipped past the starvation threshold. On the next + // sort it must be hoisted in front of large even though its DRF ratio is + // still higher. + small.SetLastSchedulingAttempt(time.Now().Add(-time.Second)) + sorted = root.sortQueues() + assert.Equal(t, len(sorted), 2) + assert.Equal(t, sorted[0].Name, "small", + "starved child must be hoisted to the front regardless of DRF ratio") + assert.Equal(t, sorted[1].Name, "large", + "non-starved sibling must keep its original relative position") +} + +// TestSortQueuesDisabledDelayKeepsDRF guards against accidentally enabling the +// hoist when an operator has turned it off. With delay=0 the DRF sort must be +// returned verbatim even for a queue that has been skipped for a long time. +func TestSortQueuesDisabledDelayKeepsDRF(t *testing.T) { + root, err := createRootQueueWithProps(map[string]string{ + configs.QueueStarvationDelay: "0s", + }) + assert.NilError(t, err, "root create failed") + + large, err := createManagedQueueGuaranteed(root, "large", false, + map[string]string{"memory": "400G"}, + map[string]string{"memory": "360G"}, + nil) + assert.NilError(t, err, "large create failed") + small, err := createManagedQueueGuaranteed(root, "small", false, + map[string]string{"memory": "100M"}, + map[string]string{"memory": "100M"}, + nil) + assert.NilError(t, err, "small create failed") + + priorAlloc, err := resources.NewResourceFromConf(map[string]string{"memory": "10M"}) + assert.NilError(t, err, "resource create failed") + small.IncAllocatedResource(priorAlloc) + + largeDemand, err := resources.NewResourceFromConf(map[string]string{"memory": "1G"}) + assert.NilError(t, err, "resource create failed") + smallDemand, err := resources.NewResourceFromConf(map[string]string{"memory": "1M"}) + assert.NilError(t, err, "resource create failed") + large.incPendingResource(largeDemand) + small.incPendingResource(smallDemand) + + // Small has been "starving" for an hour, but the feature is disabled. + small.SetLastSchedulingAttempt(time.Now().Add(-time.Hour)) + large.SetLastSchedulingAttempt(time.Now()) + + sorted := root.sortQueues() + assert.Equal(t, len(sorted), 2) + assert.Equal(t, sorted[0].Name, "large", + "disabled hoisting must leave the DRF order untouched") +} + +// createRootQueueWithProps is a local helper until the shared utility in +// utilities_test.go grows a props-taking root helper. Kept in this file so it +// does not conflict with existing helpers. +func createRootQueueWithProps(props map[string]string) (*Queue, error) { + return createManagedQueueWithProps(nil, "root", true, nil, props) +} diff --git a/pkg/scheduler/tests/queue_starvation_test.go b/pkg/scheduler/tests/queue_starvation_test.go index c60483847..c8f6a813a 100644 --- a/pkg/scheduler/tests/queue_starvation_test.go +++ b/pkg/scheduler/tests/queue_starvation_test.go @@ -21,6 +21,7 @@ package tests import ( "fmt" "testing" + "time" "gotest.tools/v3/assert" @@ -1844,3 +1845,228 @@ partitions: "CONTRAST: Without priority.offset, sparkpi is starved due to fair-share ratio. "+ "This proves priority.offset is the specific config change that eliminates starvation.") } + +// TestStarvationDelayHoistsSkippedSibling is the end-to-end validation of the +// YUNIKORN-3243 fix. With queue.starvation.delay configured on the parent and +// the clock artificially advanced past that delay, the starved sibling must be +// hoisted to the front of the next scheduling cycle and served — and with it +// the schedulingAttempted flag on its asks must flip to true so the +// autoscaler signal path (inspectOutstandingRequests) is no longer blind. +// +// The clock is advanced via Queue.SetLastSchedulingAttempt rather than with a +// wall-clock sleep so the test stays fast and deterministic. +func TestStarvationDelayHoistsSkippedSibling(t *testing.T) { + // Same 3600:1 asymmetric-guarantees setup as TestQueueStarvationWithPriorAllocation + // but with queue.starvation.delay configured on root. + configData := ` +partitions: + - name: default + queues: + - name: root + submitacl: "*" + properties: + queue.starvation.delay: 100ms + queues: + - name: large + resources: + guaranteed: + memory: 360G + max: + memory: 400G + - name: small + resources: + guaranteed: + memory: 100M + max: + memory: 100M +` + ms := &mockScheduler{} + defer ms.Stop() + + err := ms.Init(configData, false, false) + assert.NilError(t, err, "RegisterResourceManager failed") + + err = ms.proxy.UpdateNode(&si.NodeRequest{ + Nodes: []*si.NodeInfo{ + { + NodeID: "node-1:1234", + Attributes: map[string]string{}, + SchedulableResource: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 500000000000}, + }, + }, + Action: si.NodeInfo_CREATE, + }, + }, + RmID: "rm:123", + }) + assert.NilError(t, err, "NodeRequest failed") + ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000) + + // Seed small with a prior allocation so its DRF ratio (0.1) dominates + // large's (0.0), which is what causes the starvation in the first place. + err = ms.addApp("prior-app", "root.small", "default") + assert.NilError(t, err) + ms.mockRM.waitForAcceptedApplication(t, "prior-app", 1000) + err = ms.addAppRequest("prior-app", "prior", &si.Resource{ + Resources: map[string]*si.Quantity{"memory": {Value: 10000000}}, + }, 1) + assert.NilError(t, err) + + smallQueue := ms.getQueue("root.small") + largeQueue := ms.getQueue("root.large") + waitForPendingQueueResource(t, smallQueue, 10000000, 1000) + ms.scheduler.MultiStepSchedule(3) + ms.mockRM.waitForAllocations(t, 1, 1000) + + // Add the noisy neighbour on the large queue plus the small sparkpi ask + // that would normally be starved indefinitely. + err = ms.addApp("large-app", "root.large", "default") + assert.NilError(t, err) + ms.mockRM.waitForAcceptedApplication(t, "large-app", 1000) + err = ms.addAppRequest("large-app", "large", &si.Resource{ + Resources: map[string]*si.Quantity{"memory": {Value: 1000000000}}, + }, 20) + assert.NilError(t, err) + + err = ms.addApp("sparkpi-app", "root.small", "default") + assert.NilError(t, err) + ms.mockRM.waitForAcceptedApplication(t, "sparkpi-app", 1000) + err = ms.addAppRequest("sparkpi-app", "sparkpi", &si.Resource{ + Resources: map[string]*si.Quantity{"memory": {Value: 1000000}}, + }, 1) + assert.NilError(t, err) + + waitForPendingQueueResource(t, largeQueue, 20*1000000000, 1000) + waitForPendingQueueResource(t, smallQueue, 1000000, 1000) + + // A single quick cycle still starves small: its clock has only just + // started (demand arrived moments ago) so it is inside the grace window + // and the DRF sort wins normally. + ms.scheduler.MultiStepSchedule(1) + sparkpiApp := ms.getApplication("sparkpi-app") + assert.Equal(t, + int(sparkpiApp.GetAllocatedResource().Resources[siCommon.Memory]), 0, + "baseline: within the grace window, DRF sort is unchanged and small is still starved") + sparkpiAsk := sparkpiApp.GetAllocationAsk("sparkpi-0") + assert.Assert(t, sparkpiAsk != nil, "sparkpi ask should exist") + assert.Check(t, !sparkpiAsk.IsSchedulingAttempted(), + "baseline: schedulingAttempted should still be false inside the grace window") + + // Simulate small being skipped past the starvation delay without waiting + // for wall-clock time. The parent's next sortQueues() must hoist small + // because its lastSchedulingAttempt is now older than any other sibling's. + smallQueue.SetLastSchedulingAttempt(time.Now().Add(-time.Second)) + + // A single cycle is enough: the hoist runs inside sortQueues and small is + // visited before large. + ms.scheduler.MultiStepSchedule(1) + + sparkpiMem := int(sparkpiApp.GetAllocatedResource().Resources[siCommon.Memory]) + assert.Equal(t, sparkpiMem, 1000000, + "FIX CONFIRMED: once small has been skipped past queue.starvation.delay, "+ + "the parent's sort hoists it to the front and the ask is served on "+ + "the very next cycle even though its DRF ratio is still dominant.") + + assert.Check(t, sparkpiAsk.IsSchedulingAttempted(), + "AUTOSCALER SIGNAL RESTORED: visiting the hoisted queue runs "+ + "app.tryAllocate() which flips schedulingAttempted to true, so "+ + "inspectOutstandingRequests will now include this ask.") + + t.Logf("queue.starvation.delay eliminates starvation: sparkpi served after hoist, "+ + "schedulingAttempted=%v", sparkpiAsk.IsSchedulingAttempted()) +} + +// TestStarvationDelayDisabledPreservesDRFBehaviour pins the behavioural +// contract when the hoisting feature is explicitly turned off on the parent: +// the scheduler must keep the pre-existing DRF-only sort, reproducing the +// original starvation bug. This is important so operators who rely on the +// legacy behaviour can opt out cleanly. +func TestStarvationDelayDisabledPreservesDRFBehaviour(t *testing.T) { + configData := ` +partitions: + - name: default + queues: + - name: root + submitacl: "*" + properties: + queue.starvation.delay: "0s" + queues: + - name: large + resources: + guaranteed: + memory: 360G + max: + memory: 400G + - name: small + resources: + guaranteed: + memory: 100M + max: + memory: 100M +` + ms := &mockScheduler{} + defer ms.Stop() + + err := ms.Init(configData, false, false) + assert.NilError(t, err, "RegisterResourceManager failed") + + err = ms.proxy.UpdateNode(&si.NodeRequest{ + Nodes: []*si.NodeInfo{ + { + NodeID: "node-1:1234", + Attributes: map[string]string{}, + SchedulableResource: &si.Resource{ + Resources: map[string]*si.Quantity{ + "memory": {Value: 500000000000}, + }, + }, + Action: si.NodeInfo_CREATE, + }, + }, + RmID: "rm:123", + }) + assert.NilError(t, err, "NodeRequest failed") + ms.mockRM.waitForAcceptedNode(t, "node-1:1234", 1000) + + err = ms.addApp("prior-app", "root.small", "default") + assert.NilError(t, err) + ms.mockRM.waitForAcceptedApplication(t, "prior-app", 1000) + err = ms.addAppRequest("prior-app", "prior", &si.Resource{ + Resources: map[string]*si.Quantity{"memory": {Value: 10000000}}, + }, 1) + assert.NilError(t, err) + smallQueue := ms.getQueue("root.small") + waitForPendingQueueResource(t, smallQueue, 10000000, 1000) + ms.scheduler.MultiStepSchedule(3) + ms.mockRM.waitForAllocations(t, 1, 1000) + + err = ms.addApp("large-app", "root.large", "default") + assert.NilError(t, err) + ms.mockRM.waitForAcceptedApplication(t, "large-app", 1000) + err = ms.addAppRequest("large-app", "large", &si.Resource{ + Resources: map[string]*si.Quantity{"memory": {Value: 1000000000}}, + }, 20) + assert.NilError(t, err) + err = ms.addApp("sparkpi-app", "root.small", "default") + assert.NilError(t, err) + ms.mockRM.waitForAcceptedApplication(t, "sparkpi-app", 1000) + err = ms.addAppRequest("sparkpi-app", "sparkpi", &si.Resource{ + Resources: map[string]*si.Quantity{"memory": {Value: 1000000}}, + }, 1) + assert.NilError(t, err) + waitForPendingQueueResource(t, ms.getQueue("root.large"), 20*1000000000, 1000) + waitForPendingQueueResource(t, smallQueue, 1000000, 1000) + + // Even after artificially advancing the clock far past any sensible delay, + // hoisting must stay disabled because the parent is configured with 0. + smallQueue.SetLastSchedulingAttempt(time.Now().Add(-time.Hour)) + ms.scheduler.MultiStepSchedule(20) + + sparkpiApp := ms.getApplication("sparkpi-app") + sparkpiMem := int(sparkpiApp.GetAllocatedResource().Resources[siCommon.Memory]) + assert.Equal(t, sparkpiMem, 0, + "With queue.starvation.delay=0, the legacy DRF-only behaviour must be "+ + "preserved: small remains starved for the duration of the test.") +}