Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/common/configs/configvalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ const (
PreemptionPolicy = "preemption.policy"
PreemptionDelay = "preemption.delay"
QuotaPreemptionDelay = "quota.preemption.delay"
QueueStarvationDelay = "queue.starvation.delay"

// app sort priority values
ApplicationSortPriorityEnabled = "enabled"
Expand All @@ -75,6 +76,12 @@ var DefaultQuotaPreemptionDelay time.Duration = 0
var DefaultPreemptionDelay = 30 * time.Second
var DefaultAskBackOffDelay = 30 * time.Second

// DefaultQueueStarvationDelay is the default value for the queue.starvation.delay property.
// When a child queue has pending demand but the parent's sort has skipped it for longer than
// this duration, the parent's sort will hoist the starving child to the front of the order.
// A value of 0 disables the hoisting behaviour on that parent queue. See YUNIKORN-3243.
var DefaultQueueStarvationDelay = 15 * time.Second

// QueueNameRegExp to validate the name of a queue.
// A queue can be a username with the dot replaced. Most systems allow a 32 character username.
// The queue name must thus allow for at least that length with the replacement of dots.
Expand Down
170 changes: 169 additions & 1 deletion pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"errors"
"fmt"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -94,6 +95,15 @@ type Queue struct {
isQuotaPreemptionRunning bool
unschedAskBackoff uint64
askBackoffDelay time.Duration
// starvationDelay controls how long a child queue can remain unvisited by this
// queue's TryAllocate sort before it is hoisted to the front of the sibling order.
// 0 disables the hoisting behaviour on this queue. See YUNIKORN-3243.
starvationDelay time.Duration
// lastSchedulingAttempt is the last wall-clock time TryAllocate descended into
// this queue. Used by the parent's sort to detect queues that have been skipped
// for too long despite having pending demand. Zero value means "not tracked"
// (i.e. this queue currently has no pending demand). See YUNIKORN-3243.
lastSchedulingAttempt time.Time

locking.RWMutex
}
Expand All @@ -119,6 +129,8 @@ func newBlankQueue() *Queue {
quotaPreemptionDelay: 0,
quotaPreemptionStartTime: time.Time{},
askBackoffDelay: configs.DefaultAskBackOffDelay,
starvationDelay: configs.DefaultQueueStarvationDelay,
lastSchedulingAttempt: time.Time{},
}
}

Expand Down Expand Up @@ -283,6 +295,20 @@ func convertDelay(value string, def time.Duration) (time.Duration, error) {
return result, nil
}

// convertStarvationDelay parses a duration string for queue.starvation.delay.
// Unlike convertDelay, a value of zero is allowed and means "feature disabled on this queue".
// Negative values fall back to the default. See YUNIKORN-3243.
func convertStarvationDelay(value string) (time.Duration, error) {
result, err := time.ParseDuration(value)
if err != nil {
return configs.DefaultQueueStarvationDelay, err
}
if int64(result) < int64(0) {
return configs.DefaultQueueStarvationDelay, fmt.Errorf("starvation delay must be non-negative: %s", value)
}
return result, nil
}

func priorityOffset(value string) (int32, error) {
intValue, err := strconv.ParseInt(value, 10, 32)
if err != nil {
Expand Down Expand Up @@ -722,6 +748,12 @@ func (sq *Queue) UpdateQueueProperties(oldMaxResource *resources.Resource) {
zap.Error(err))
}
sq.setPreemptionTime(oldMaxResource, oldDelay)
case configs.QueueStarvationDelay:
sq.starvationDelay, err = convertStarvationDelay(value)
if err != nil {
log.Log(log.SchedQueue).Debug("queue starvation delay configuration error",
zap.Error(err))
}
default:
// skip unknown properties just log them
log.Log(log.SchedQueue).Debug("queue property skipped",
Expand Down Expand Up @@ -926,7 +958,14 @@ func (sq *Queue) incPendingResource(delta *resources.Resource) {
// update this queue
sq.Lock()
defer sq.Unlock()
wasZero := !resources.StrictlyGreaterThanZero(sq.pending)
sq.pending = resources.Add(sq.pending, delta)
// Start the starvation clock when pending transitions from zero to non-zero.
// This gives a newly-pending queue a full grace window before it can be hoisted.
// See YUNIKORN-3243.
if wasZero && resources.StrictlyGreaterThanZero(sq.pending) && sq.lastSchedulingAttempt.IsZero() {
sq.lastSchedulingAttempt = time.Now()
}
sq.updatePendingResourceMetrics()
}

Expand Down Expand Up @@ -955,7 +994,71 @@ func (sq *Queue) decPendingResource(delta *resources.Resource) {
// If we prune the resource first and the resource become nil after pruning,
// the metrics will not be updated with nil resource, this is not expected.
sq.pending.Prune()
// Pending dropped to zero: stop tracking starvation until new demand arrives.
// See YUNIKORN-3243.
if !resources.StrictlyGreaterThanZero(sq.pending) {
sq.lastSchedulingAttempt = time.Time{}
}
}
}

// GetStarvationDelay returns the configured starvation delay for this queue.
// When > 0, the queue's sortQueues() will hoist any child with pending demand
// that has not been visited within this duration to the front of the sort order.
// See YUNIKORN-3243.
func (sq *Queue) GetStarvationDelay() time.Duration {
sq.RLock()
defer sq.RUnlock()
return sq.starvationDelay
}

// GetLastSchedulingAttempt returns the wall-clock time at which TryAllocate last
// descended into this queue. A zero value indicates the queue has no pending
// demand and is not currently being tracked. See YUNIKORN-3243.
func (sq *Queue) GetLastSchedulingAttempt() time.Time {
sq.RLock()
defer sq.RUnlock()
return sq.lastSchedulingAttempt
}

// SetLastSchedulingAttempt overrides the last-scheduling-attempt timestamp.
// This is only intended for tests that need to simulate a queue that has been
// skipped for an extended period.
func (sq *Queue) SetLastSchedulingAttempt(t time.Time) {
sq.Lock()
defer sq.Unlock()
sq.lastSchedulingAttempt = t
}

// markSchedulingAttempted records that TryAllocate has descended into this queue.
// Called at the entry of TryAllocate so that siblings whose TryAllocate was never
// reached (because an earlier sibling's allocation short-circuited the loop) keep
// their older timestamp and can be hoisted by the parent's next sort.
// See YUNIKORN-3243.
func (sq *Queue) markSchedulingAttempted() {
sq.Lock()
defer sq.Unlock()
sq.lastSchedulingAttempt = time.Now()
}

// isStarvedAt reports whether this queue has pending demand and has not been
// visited by TryAllocate within the given delay, evaluated as of now.
// A zero or negative delay disables the check. A queue that has never had
// its clock started (zero lastSchedulingAttempt) is not considered starving.
// See YUNIKORN-3243.
func (sq *Queue) isStarvedAt(now time.Time, delay time.Duration) bool {
if delay <= 0 {
return false
}
sq.RLock()
defer sq.RUnlock()
if sq.lastSchedulingAttempt.IsZero() {
return false
}
if !resources.StrictlyGreaterThanZero(sq.pending) {
return false
}
return now.Sub(sq.lastSchedulingAttempt) > delay
}

// AddApplication adds the application to the queue. All checks are assumed to have passed before we get here.
Expand Down Expand Up @@ -1402,12 +1505,72 @@ func (sq *Queue) sortQueues() []*Queue {
sortedMaxFairResources = append(sortedMaxFairResources, child.GetFairMaxResource())
}
}
// Sort the queues
// Sort the queues using the configured policy
sortQueue(sortedQueues, sortedMaxFairResources, sq.getSortType(), sq.IsPrioritySortEnabled())

// Bounded-visit guarantee: hoist any child with pending demand that has not
// been visited within this queue's starvation delay to the front of the
// order. This preserves the relative order of both starved and non-starved
// children as produced by the policy sort above. See YUNIKORN-3243.
if delay := sq.GetStarvationDelay(); delay > 0 && len(sortedQueues) > 1 {
hoistStarvedQueues(sortedQueues, time.Now(), delay)
}

return sortedQueues
}

// hoistStarvedQueues moves every child whose starvation clock has exceeded
// delay to the front of queues. Inside the starved group, queues are ordered
// by ascending lastSchedulingAttempt (oldest first) so the most-neglected
// queue wins the hoist. The relative order of the non-starved group is
// preserved. Lock free, invoked after the primary sort. See YUNIKORN-3243.
func hoistStarvedQueues(queues []*Queue, now time.Time, delay time.Duration) {
if delay <= 0 || len(queues) < 2 {
return
}
starvedIdx := make([]int, 0, len(queues))
for i, q := range queues {
if q.isStarvedAt(now, delay) {
starvedIdx = append(starvedIdx, i)
}
}
if len(starvedIdx) == 0 {
return
}
starved := make([]*Queue, len(starvedIdx))
for i, idx := range starvedIdx {
starved[i] = queues[idx]
}
// Oldest last-attempt first so queues that have been skipped the longest
// are served before queues that were visited more recently. Stable sort
// keeps deterministic ordering for equal timestamps.
sort.SliceStable(starved, func(i, j int) bool {
return starved[i].GetLastSchedulingAttempt().Before(starved[j].GetLastSchedulingAttempt())
})
nonStarved := make([]*Queue, 0, len(queues)-len(starved))
starvedSet := make(map[int]struct{}, len(starvedIdx))
for _, idx := range starvedIdx {
starvedSet[idx] = struct{}{}
}
for i, q := range queues {
if _, ok := starvedSet[i]; !ok {
nonStarved = append(nonStarved, q)
}
}
copy(queues, starved)
copy(queues[len(starved):], nonStarved)
if log.Log(log.SchedQueue).Core().Enabled(zap.DebugLevel) {
names := make([]string, 0, len(queues))
for _, q := range queues {
names = append(names, q.QueuePath)
}
log.Log(log.SchedQueue).Debug("starvation hoist reordered sibling queues",
zap.Duration("delay", delay),
zap.Int("starvedCount", len(starved)),
zap.Strings("order", names))
}
}

// getHeadRoom returns the headroom for the queue. This can never be more than the headroom for the parent.
// In case there are no nodes in a newly started cluster and no queues have a limit configured this call
// will return nil.
Expand Down Expand Up @@ -1612,6 +1775,11 @@ func (sq *Queue) canRunApp(appID string) bool {
// Applications are sorted based on the application sortPolicy. Applications without pending resources are skipped.
// Lock free call this all locks are taken when needed in called functions
func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() NodeIterator, getnode func(string) *Node, allowPreemption, quotaPreemption bool) *AllocationResult {
// Record the visit before any further work. Combined with the hoisting logic
// in the parent's sortQueues(), this guarantees that a queue with pending
// demand cannot be skipped indefinitely regardless of sibling DRF ratios.
// See YUNIKORN-3243.
sq.markSchedulingAttempted()
if quotaPreemption && sq.shouldTriggerPreemption() {
go func() {
log.Log(log.SchedQueue).Info("Preconditions has passed trigger preemption to enforce new max resources",
Expand Down
Loading