diff --git a/pkg/common/configs/config.go b/pkg/common/configs/config.go index 32a4ef2de..133a46163 100644 --- a/pkg/common/configs/config.go +++ b/pkg/common/configs/config.go @@ -47,13 +47,14 @@ type SchedulerConfig struct { // - the preemption configuration for the partition // - user group resolver type (os, ldap, "") type PartitionConfig struct { - Name string - Queues []QueueConfig - PlacementRules []PlacementRule `yaml:",omitempty" json:",omitempty"` - Limits []Limit `yaml:",omitempty" json:",omitempty"` - Preemption PartitionPreemptionConfig `yaml:",omitempty" json:",omitempty"` - NodeSortPolicy NodeSortingPolicy `yaml:",omitempty" json:",omitempty"` - UserGroupResolver UserGroupResolver `yaml:",omitempty" json:",omitempty"` + Name string + Queues []QueueConfig + PlacementRules []PlacementRule `yaml:",omitempty" json:",omitempty"` + Limits []Limit `yaml:",omitempty" json:",omitempty"` + Preemption PartitionPreemptionConfig `yaml:",omitempty" json:",omitempty"` + NodeSortPolicy NodeSortingPolicy `yaml:",omitempty" json:",omitempty"` + UserGroupResolver UserGroupResolver `yaml:",omitempty" json:",omitempty"` + TryNodesThreadCount int `yaml:",omitempty" json:",omitempty"` } type UserGroupResolver struct { diff --git a/pkg/common/configs/configvalidator.go b/pkg/common/configs/configvalidator.go index 76b537f7e..fc78c983e 100644 --- a/pkg/common/configs/configvalidator.go +++ b/pkg/common/configs/configvalidator.go @@ -758,6 +758,11 @@ func Validate(newConfig *SchedulerConfig) error { return fmt.Errorf("duplicate partition name found with name %s", partition.Name) } partitionMap[strings.ToLower(partition.Name)] = true + + if partition.TryNodesThreadCount <= 0 { + partition.TryNodesThreadCount = 1 + } + // check the queue structure err := checkQueuesStructure(&partition) if err != nil { diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 542447546..dd498b143 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -982,7 +982,9 @@ func (sa *Application) canReplace(request *Allocation) bool { } // tryAllocate will perform a regular allocation of a pending request, includes placeholders. -func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption bool, preemptionDelay time.Duration, preemptAttemptsRemaining *int, nodeIterator func() NodeIterator, fullNodeIterator func() NodeIterator, getNodeFn func(string) *Node) *AllocationResult { +func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption bool, tryNodesThreadCount int, + preemptionDelay time.Duration, preemptAttemptsRemaining *int, nodeIterator func() NodeIterator, + fullNodeIterator func() NodeIterator, getNodeFn func(string) *Node) *AllocationResult { sa.Lock() defer sa.Unlock() if sa.sortedRequests == nil { @@ -1043,8 +1045,15 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption iterator := nodeIterator() if iterator != nil { - if result := sa.tryNodes(request, iterator); result != nil { - // have a candidate return it + var result *AllocationResult + + if tryNodesThreadCount > 1 { + result = sa.tryNodesInParallel(request, iterator, tryNodesThreadCount) + } else { + result = sa.tryNodes(request, iterator) + } + + if result != nil { return result } @@ -1460,6 +1469,171 @@ func (sa *Application) tryNodesNoReserve(ask *Allocation, iterator NodeIterator, // Try all the nodes for a request. The resultType is an allocation or reservation of a node. // New allocations can only be reserved after a delay. +func (sa *Application) tryNodesInParallel(ask *Allocation, iterator NodeIterator, tryNodesThreadCount int) *AllocationResult { //nolint:funlen + var nodeToReserve *Node + scoreReserved := math.Inf(1) + allocKey := ask.GetAllocationKey() + reserved := sa.reservations[allocKey] + var allocResult *AllocationResult + var predicateErrors map[string]int + + var mu sync.Mutex + + // Channel to signal completion + done := make(chan struct{}) + defer close(done) + + // Function to process each batch + processBatch := func(batch []*Node) { + var wg sync.WaitGroup + semaphore := make(chan struct{}, tryNodesThreadCount) + candidateNodes := make([]*Node, len(batch)) + errors := make([]error, len(batch)) + + for idx, node := range batch { + wg.Add(1) + semaphore <- struct{}{} + go func(idx int, node *Node) { + defer wg.Done() + defer func() { <-semaphore }() + dryRunResult, err := sa.tryNodeDryRun(node, ask) + + mu.Lock() + defer mu.Unlock() + if err != nil { + errors[idx] = err + } else if dryRunResult != nil { + candidateNodes[idx] = node + } + }(idx, node) + } + + wg.Wait() + + for _, err := range errors { + if err != nil { + mu.Lock() + if predicateErrors == nil { + predicateErrors = make(map[string]int) + } + predicateErrors[err.Error()]++ + mu.Unlock() + } + } + + // Process dry-run candidateNodes sequentially within the batch + for _, candidateNode := range candidateNodes { + if candidateNode == nil { + continue + } + tryNodeStart := time.Now() + result, err := sa.tryNode(candidateNode, ask) + if err != nil { + if predicateErrors == nil { + predicateErrors = make(map[string]int) + } + predicateErrors[err.Error()]++ + } else if result != nil { + metrics.GetSchedulerMetrics().ObserveTryNodeLatency(tryNodeStart) + if reserved != nil { + if reserved.nodeID != candidateNode.NodeID { + log.Log(log.SchedApplication).Debug("allocate picking reserved alloc during non reserved allocate", + zap.String("appID", sa.ApplicationID), + zap.String("reserved nodeID", reserved.nodeID), + zap.String("allocationKey", allocKey)) + result.ReservedNodeID = reserved.nodeID + } else { + log.Log(log.SchedApplication).Debug("allocate found reserved alloc during non reserved allocate", + zap.String("appID", sa.ApplicationID), + zap.String("nodeID", candidateNode.NodeID), + zap.String("allocationKey", allocKey)) + } + result.ResultType = AllocatedReserved + allocResult = result + return + } + allocResult = result + return + } + askAge := time.Since(ask.GetCreateTime()) + if reserved == nil && askAge > reservationDelay { + log.Log(log.SchedApplication).Debug("app reservation check", + zap.String("allocationKey", allocKey), + zap.Time("createTime", ask.GetCreateTime()), + zap.Duration("askAge", askAge), + zap.Duration("reservationDelay", reservationDelay)) + score := candidateNode.GetFitInScoreForAvailableResource(ask.GetAllocatedResource()) + if score < scoreReserved { + scoreReserved = score + nodeToReserve = candidateNode + } + } + } + } + + // Iterate over nodes and process in batches + var batch []*Node + iterator.ForEachNode(func(node *Node) bool { + batch = append(batch, node) + if len(batch) >= tryNodesThreadCount { + processBatch(batch) + batch = nil + if allocResult != nil { + return false // Exit iteration if an allocation has been made + } + } + return true + }) + // Process any remaining nodes in the last batch + if len(batch) > 0 && allocResult == nil { + processBatch(batch) + } + + if allocResult != nil { + return allocResult + } + + if predicateErrors != nil { + ask.SendPredicatesFailedEvent(predicateErrors) + } + + if nodeToReserve != nil && !nodeToReserve.IsReserved() { + log.Log(log.SchedApplication).Debug("found candidate node for app reservation", + zap.String("appID", sa.ApplicationID), + zap.String("nodeID", nodeToReserve.NodeID), + zap.String("allocationKey", allocKey), + zap.Int("reservations", len(sa.reservations))) + if nodeToReserve.preReserveConditions(ask) != nil { + return nil + } + return newReservedAllocationResult(nodeToReserve.NodeID, ask) + } + + return nil +} + +func (sa *Application) tryNodeDryRun(node *Node, ask *Allocation) (*AllocationResult, error) { + toAllocate := ask.GetAllocatedResource() + allocationKey := ask.GetAllocationKey() + + if !node.IsSchedulable() || !node.FitInNode(ask.GetAllocatedResource()) { + return nil, nil + } + + // create the key for the reservation + if !node.preAllocateCheck(toAllocate, allocationKey) { + // skip schedule onto node + return nil, nil + } + // skip the node if conditions can not be satisfied + if err := node.preAllocateConditions(ask); err != nil { + return nil, err + } + + result := newAllocatedAllocationResult(node.NodeID, ask) + return result, nil +} + func (sa *Application) tryNodes(ask *Allocation, iterator NodeIterator) *AllocationResult { var nodeToReserve *Node scoreReserved := math.Inf(1) diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index 2323466a1..e1fccb872 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -2156,7 +2156,7 @@ func TestTryAllocateNoRequests(t *testing.T) { app := newApplication(appID1, "default", "root.unknown") preemptionAttemptsRemaining := 0 - result := app.tryAllocate(node.GetAvailableResource(), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result := app.tryAllocate(node.GetAvailableResource(), true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Check(t, result == nil, "unexpected result") } @@ -2181,7 +2181,7 @@ func TestTryAllocateFit(t *testing.T) { assert.NilError(t, err) preemptionAttemptsRemaining := 0 - result := app.tryAllocate(node.GetAvailableResource(), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result := app.tryAllocate(node.GetAvailableResource(), true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result != nil, "alloc expected") assert.Assert(t, result.Request != nil, "alloc expected") @@ -2228,28 +2228,35 @@ func TestTryAllocatePreemptQueue(t *testing.T) { preemptionAttemptsRemaining := 10 - result1 := app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result1 := app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}), true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result1 != nil, "result1 expected") alloc1 := result1.Request assert.Assert(t, alloc1 != nil, "alloc1 expected") - result2 := app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result2 := app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}), true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result2 != nil, "result2 expected") alloc2 := result2.Request assert.Assert(t, alloc2 != nil, "alloc2 expected") // preemption max attempts exhausted preemptionAttemptsRemaining = 0 - result3 := app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result3 := app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result3 == nil, "result3 not expected") assert.Assert(t, !alloc2.IsPreempted(), "alloc2 should not have been preempted") log := ask3.GetAllocationLog() - assert.Equal(t, log[0].Message, common.PreemptionMaxAttemptsExhausted) + var found bool + for _, entry := range log { + if entry.Message == common.PreemptionMaxAttemptsExhausted { + found = true + break + } + } + assert.Assert(t, found, "PreemptionMaxAttemptsExhausted message not found in allocation log") assert.Equal(t, preemptionAttemptsRemaining, 0) preemptionAttemptsRemaining = 10 // on first attempt, not enough time has passed - result3 = app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result3 = app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result3 == nil, "result3 not expected") assert.Assert(t, !alloc2.IsPreempted(), "alloc2 should not have been preempted") assertAllocationLog(t, ask3) @@ -2257,7 +2264,7 @@ func TestTryAllocatePreemptQueue(t *testing.T) { // pass the time and try again ask3.createTime = ask3.createTime.Add(-30 * time.Second) - result3 = app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result3 = app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 0}), true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result3 != nil && result3.Request != nil && result3.ResultType == Reserved, "alloc3 should be a reservation") assert.Assert(t, alloc2.IsPreempted(), "alloc2 should have been preempted") assert.Equal(t, preemptionAttemptsRemaining, 9) @@ -2268,14 +2275,14 @@ func TestTryAllocatePreemptNode(t *testing.T) { preemptionAttemptsRemaining := 10 // preemption delay not yet passed, so preemption should fail - result3 := app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result3 := app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result3 == nil, "result3 expected") assert.Assert(t, !allocs[1].IsPreempted(), "alloc1 should have been preempted") assertAllocationLog(t, ask3) // pass the time and try again ask3.createTime = ask3.createTime.Add(-30 * time.Second) - result3 = app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result3 = app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result3 != nil, "result3 expected") assert.Equal(t, Reserved, result3.ResultType, "expected reservation") alloc3 := result3.Request @@ -2340,7 +2347,7 @@ func createPreemptNodeTestSetup(t *testing.T) (func() NodeIterator, func(NodeID // consume capacity with 'unlimited' app for _, r := range []*resources.Resource{resources.NewResourceFromMap(map[string]resources.Quantity{"first": 40}), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 39})} { - result0 := app0.tryAllocate(r, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result0 := app0.tryAllocate(r, true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result0 != nil, "result0 expected") alloc0 := result0.Request assert.Assert(t, alloc0 != nil, "alloc0 expected") @@ -2351,7 +2358,7 @@ func createPreemptNodeTestSetup(t *testing.T) (func() NodeIterator, func(NodeID allocs := make([]*Allocation, 0) for _, r := range []*resources.Resource{resources.NewResourceFromMap(map[string]resources.Quantity{"first": 28}), resources.NewResourceFromMap(map[string]resources.Quantity{"first": 23})} { var alloc1 *Allocation - result1 := app1.tryAllocate(r, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result1 := app1.tryAllocate(r, true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result1 != nil, "result1 expected") alloc1 = result1.Request assert.Assert(t, result1.Request != nil, "alloc1 expected") @@ -2361,7 +2368,7 @@ func createPreemptNodeTestSetup(t *testing.T) (func() NodeIterator, func(NodeID // on first attempt, should see a reservation since we're after the reservation timeout ask3.createTime = ask3.createTime.Add(-10 * time.Second) - result3 := app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result3 := app2.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result3 != nil, "result3 expected") alloc3 := result3.Request assert.Assert(t, alloc3 != nil, "alloc3 not expected") @@ -2397,7 +2404,7 @@ func TestTryAllocatePreemptNodeWithReservations(t *testing.T) { // pass the time and try again ask4.createTime = ask4.createTime.Add(-30 * time.Second) reservationWaitTimeout = -60 * time.Second - result3 := app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result3 := app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result3 != nil, "result3 expected") assert.Equal(t, Reserved, result3.ResultType, "expected reservation") alloc3 := result3.Request @@ -2416,13 +2423,13 @@ func TestTryAllocatePreemptNodeWithReservationsWithHighPriority(t *testing.T) { // pass the time and try again ask4.createTime = ask4.createTime.Add(-30 * time.Second) reservationWaitTimeout = -60 * time.Second - result3 := app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result3 := app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result3 == nil, "result3 expected") // Set higher priority than the reserved ask priority ask4.priority = math.MaxInt32 ask4.preemptCheckTime = ask4.preemptCheckTime.Add(-30 * time.Second) - result4 := app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result4 := app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result4 != nil, "result3 expected") assert.Equal(t, Reserved, result4.ResultType, "expected reservation") alloc3 := result4.Request @@ -2452,7 +2459,7 @@ func TestTryAllocatePreemptNodeWithReservationsNotPossibleToCancel(t *testing.T) // on first attempt, should see a reservation on node2 since we're after the reservation timeout var alloc11 *Allocation ask5.createTime = ask5.createTime.Add(-10 * time.Second) - result1 := app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result1 := app1.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result1 != nil, "result1 expected") alloc11 = result1.Request assert.Equal(t, "alloc5", alloc11.allocationKey, "wrong node assignment") @@ -2465,7 +2472,7 @@ func TestTryAllocatePreemptNodeWithReservationsNotPossibleToCancel(t *testing.T) // Set higher priority than the reserved ask priority but no preemption because reserved ask waiting time not exceeded ask4.priority = 1 - result3 := app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result3 := app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result3 == nil, "result3 expected") // Ensure reserved ask waiting time exceeds @@ -2474,7 +2481,7 @@ func TestTryAllocatePreemptNodeWithReservationsNotPossibleToCancel(t *testing.T) ask4.createTime = ask4.createTime.Add(-30 * time.Second) reservationWaitTimeout = -60 * time.Second ask4.preemptCheckTime = ask4.preemptCheckTime.Add(-30 * time.Second) - result4 := app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result4 := app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result4 == nil, "result3 expected") // Ensure reserved ask waiting time exceeds @@ -2485,7 +2492,7 @@ func TestTryAllocatePreemptNodeWithReservationsNotPossibleToCancel(t *testing.T) ask4.createTime = ask4.createTime.Add(-30 * time.Second) reservationWaitTimeout = -60 * time.Second ask4.preemptCheckTime = ask4.preemptCheckTime.Add(-30 * time.Second) - result5 := app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result5 := app3.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 18}), true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Assert(t, result5 != nil, "result3 expected") assert.Equal(t, Reserved, result5.ResultType, "expected reservation") alloc3 := result5.Request @@ -2808,7 +2815,7 @@ func TestRequestDoesNotFitQueueEvents(t *testing.T) { attempts := 0 // try to allocate - app.tryAllocate(headroom, true, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) + app.tryAllocate(headroom, true, 1, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) assert.Equal(t, 1, len(eventSystem.Events)) event := eventSystem.Events[0] assert.Equal(t, si.EventRecord_REQUEST, event.Type) @@ -2819,14 +2826,14 @@ func TestRequestDoesNotFitQueueEvents(t *testing.T) { assert.Equal(t, "Request 'alloc-0' does not fit in queue 'root.default' (requested map[memory:100 vcores:10], available map[memory:0 vcores:0])", event.Message) // second attempt - no new event - app.tryAllocate(headroom, true, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) + app.tryAllocate(headroom, true, 1, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) assert.Equal(t, 1, len(eventSystem.Events)) // third attempt with enough headroom - new event eventSystem.Reset() headroom, err = resources.NewResourceFromConf(map[string]string{"memory": "1000", "vcores": "1000"}) assert.NilError(t, err) - app.tryAllocate(headroom, true, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) + app.tryAllocate(headroom, true, 1, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) assert.Equal(t, 1, len(eventSystem.Events)) event = eventSystem.Events[0] assert.Equal(t, si.EventRecord_REQUEST, event.Type) @@ -2879,7 +2886,7 @@ func TestRequestDoesNotFitUserQuotaQueueEvents(t *testing.T) { attempts := 0 // try to allocate - app.tryAllocate(headroom, true, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) + app.tryAllocate(headroom, true, 1, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) assert.Equal(t, 1, len(eventSystem.Events)) event := eventSystem.Events[0] assert.Equal(t, si.EventRecord_REQUEST, event.Type) @@ -2890,7 +2897,7 @@ func TestRequestDoesNotFitUserQuotaQueueEvents(t *testing.T) { assert.Equal(t, "Request 'alloc-0' exceeds the available user quota (requested map[memory:100 vcores:10], available map[memory:1 vcores:1])", event.Message) // second attempt - no new event - app.tryAllocate(headroom, true, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) + app.tryAllocate(headroom, true, 1, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) assert.Equal(t, 1, len(eventSystem.Events)) // third attempt with enough headroom - new event @@ -2898,7 +2905,7 @@ func TestRequestDoesNotFitUserQuotaQueueEvents(t *testing.T) { conf.Limits[0].MaxResources = nil err = ugm.GetUserManager().UpdateConfig(conf, "root") assert.NilError(t, err) - app.tryAllocate(headroom, true, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) + app.tryAllocate(headroom, true, 1, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) assert.Equal(t, 1, len(eventSystem.Events)) event = eventSystem.Events[0] assert.Equal(t, si.EventRecord_REQUEST, event.Type) @@ -2927,7 +2934,7 @@ func TestAllocationFailures(t *testing.T) { attempts := 0 // case #1: not enough queue headroom - app.tryAllocate(headroom, true, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) + app.tryAllocate(headroom, true, 1, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) assert.Equal(t, 1, len(ask.allocLog)) assert.Equal(t, int32(1), ask.allocLog[NotEnoughQueueQuota].Count) @@ -2954,7 +2961,7 @@ func TestAllocationFailures(t *testing.T) { assert.NilError(t, err) headroom, err = resources.NewResourceFromConf(map[string]string{"memory": "1000", "vcores": "1000"}) assert.NilError(t, err) - app.tryAllocate(headroom, true, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) + app.tryAllocate(headroom, true, 1, time.Second, &attempts, nilNodeIterator, nilNodeIterator, nilGetNode) assert.Equal(t, 2, len(ask.allocLog)) assert.Equal(t, int32(1), ask.allocLog[NotEnoughUserQuota].Count) } @@ -3243,7 +3250,7 @@ func TestPredicateFailedEvents(t *testing.T) { plugins.RegisterSchedulerPlugin(mockPlugin) defer plugins.UnregisterSchedulerPlugins() - app.tryAllocate(headroom, false, time.Second, &attempts, func() NodeIterator { + app.tryAllocate(headroom, false, 1, time.Second, &attempts, func() NodeIterator { return &testIterator{} }, nilNodeIterator, nilGetNode) assert.Equal(t, 1, len(eventSystem.Events)) @@ -3299,7 +3306,7 @@ func TestRequiredNodePreemption(t *testing.T) { // allocate ask headRoom := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}) - result := app.tryAllocate(headRoom, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result := app.tryAllocate(headRoom, true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Equal(t, result.ResultType, Allocated, "could not allocate ask-1") assert.Equal(t, result.Request.allocationKey, "ask-1", "unexpected allocation key") @@ -3311,7 +3318,7 @@ func TestRequiredNodePreemption(t *testing.T) { assert.NilError(t, err, "could not add ask-2") // try to allocate ask2 with node being full - expect a reservation - result = app.tryAllocate(headRoom, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result = app.tryAllocate(headRoom, true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Equal(t, result.ResultType, Reserved, "allocation result is not reserved") assert.Equal(t, result.Request.allocationKey, "ask-2", "unexpected allocation key") err = app.Reserve(node, ask2) @@ -3378,7 +3385,7 @@ func TestRequiredNodePreemptionFailed(t *testing.T) { // allocate ask headRoom := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 50}) - result := app.tryAllocate(headRoom, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result := app.tryAllocate(headRoom, true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Equal(t, result.ResultType, Allocated, "could not allocate ask-1") assert.Equal(t, result.Request.allocationKey, "ask-1", "unexpected allocation key") @@ -3390,7 +3397,7 @@ func TestRequiredNodePreemptionFailed(t *testing.T) { assert.NilError(t, err, "could not add ask-2") // try to allocate ask2 with node being full - expect a reservation - result = app.tryAllocate(headRoom, true, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result = app.tryAllocate(headRoom, true, 1, 30*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Equal(t, result.ResultType, Reserved, "allocation result is not reserved") assert.Equal(t, result.Request.allocationKey, "ask-2", "unexpected allocation key") err = app.Reserve(node, ask2) @@ -3834,3 +3841,37 @@ func TestAppSubmissionTime(t *testing.T) { app.AddAllocation(alloc2) assert.Equal(t, app.submissionTime, time.Unix(0, 30), "app submission time is not set properly") } + +func TestTryNodesInParallel(t *testing.T) { + node1 := newNode(nodeID1, map[string]resources.Quantity{"first": 5}) + node2 := newNode(nodeID2, map[string]resources.Quantity{"first": 5}) + iterator := getNodeIteratorFn(node1, node2) + + app := newApplication(appID0, "default", "root.default") + queue, err := createRootQueue(map[string]string{"first": "5"}) + assert.NilError(t, err, "queue create failed") + app.queue = queue + + res := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 5}) + ask := newAllocationAsk(aKey, appID0, res) + err = app.AddAllocationAsk(ask) + assert.NilError(t, err, "ask should have been added to app") + + // successful allocation + result := app.tryNodesInParallel(ask, iterator(), 2) + assert.Assert(t, result != nil, "result should not be nil") + assert.Equal(t, Allocated, result.ResultType, "result type should be Allocated") + assert.Assert(t, result.NodeID == nodeID1 || result.NodeID == nodeID2, "result should be on node1 or node2") + + // no nodes available + iterator = getNodeIteratorFn() + result = app.tryNodesInParallel(ask, iterator(), 2) + assert.Assert(t, result == nil, "result should be nil since no nodes are available") + + // request larger than node capacity + largeRes := resources.NewResourceFromMap(map[string]resources.Quantity{"first": 10}) + largeAsk := newAllocationAsk("large-ask", appID0, largeRes) + iterator = getNodeIteratorFn(node1, node2) + result = app.tryNodesInParallel(largeAsk, iterator(), 2) + assert.Assert(t, result == nil, "result should be nil since request is larger than node capacity") +} diff --git a/pkg/scheduler/objects/preemption_test.go b/pkg/scheduler/objects/preemption_test.go index 4cbf939a0..7a3e09109 100644 --- a/pkg/scheduler/objects/preemption_test.go +++ b/pkg/scheduler/objects/preemption_test.go @@ -166,7 +166,7 @@ func TestCheckPreconditions(t *testing.T) { return node } preemptionAttemptsRemaining := 1 - result := app.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2}), true, 1*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) + result := app.tryAllocate(resources.NewResourceFromMap(map[string]resources.Quantity{"first": 2}), true, 1, 1*time.Second, &preemptionAttemptsRemaining, iterator, iterator, getNode) assert.Check(t, result == nil, "unexpected result") assertAllocationLog(t, ask) ask.preemptCheckTime = time.Now().Add(-1 * time.Minute) diff --git a/pkg/scheduler/objects/queue.go b/pkg/scheduler/objects/queue.go index a81b1bfae..14b6e2dc3 100644 --- a/pkg/scheduler/objects/queue.go +++ b/pkg/scheduler/objects/queue.go @@ -1430,7 +1430,8 @@ func (sq *Queue) canRunApp(appID string) bool { // resources are skipped. // Applications are sorted based on the application sortPolicy. Applications without pending resources are skipped. // Lock free call this all locks are taken when needed in called functions -func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() NodeIterator, getnode func(string) *Node, allowPreemption bool) *AllocationResult { +func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() NodeIterator, + getnode func(string) *Node, allowPreemption bool, tryNodesThreadCount int) *AllocationResult { if sq.IsLeafQueue() { // get the headroom headRoom := sq.getHeadRoom() @@ -1445,13 +1446,16 @@ func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() N if app.IsAccepted() && (!runnableInQueue || !runnableByUserLimit) { continue } - result := app.tryAllocate(headRoom, allowPreemption, preemptionDelay, &preemptAttemptsRemaining, iterator, fullIterator, getnode) + startTime := time.Now() + result := app.tryAllocate(headRoom, allowPreemption, tryNodesThreadCount, preemptionDelay, + &preemptAttemptsRemaining, iterator, fullIterator, getnode) if result != nil { log.Log(log.SchedQueue).Info("allocation found on queue", zap.String("queueName", sq.QueuePath), zap.String("appID", app.ApplicationID), zap.Stringer("resultType", result.ResultType), - zap.Stringer("allocation", result.Request)) + zap.Stringer("allocation", result.Request), + zap.Int64("timeTakenInMicroSecs:", time.Since(startTime).Microseconds())) // if the app is still in Accepted state we're allocating placeholders. // we want to count these apps as running if app.IsAccepted() { @@ -1463,7 +1467,7 @@ func (sq *Queue) TryAllocate(iterator func() NodeIterator, fullIterator func() N } else { // process the child queues (filters out queues without pending requests) for _, child := range sq.sortQueues() { - result := child.TryAllocate(iterator, fullIterator, getnode, allowPreemption) + result := child.TryAllocate(iterator, fullIterator, getnode, allowPreemption, tryNodesThreadCount) if result != nil { return result } diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index 96c589436..4cf93c55f 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -67,6 +67,7 @@ type PartitionContext struct { preemptionEnabled bool // whether preemption is enabled or not foreignAllocs map[string]*objects.Allocation // foreign (non-Yunikorn) allocations appQueueMapping *objects.AppQueueMapping // appID mapping to queues + tryNodesThreadCount int // number of threads executing tryNode concurrently // The partition write lock must not be held while manipulating an application. // Scheduling is running continuously as a lock free background task. Scheduling an application @@ -139,6 +140,7 @@ func (pc *PartitionContext) initialPartitionFromConfig(conf configs.PartitionCon pc.userGroupCache = security.GetUserGroupCache(conf.UserGroupResolver, security.GetConfigReader(), security.GetLdapAccess()) pc.updateNodeSortingPolicy(conf, silence) pc.updatePreemption(conf) + pc.updateTryNodesThreadCount(conf) // update limit settings: start at the root if !silence { @@ -168,6 +170,10 @@ func (pc *PartitionContext) updatePreemption(conf configs.PartitionConfig) { pc.preemptionEnabled = conf.Preemption.Enabled == nil || *conf.Preemption.Enabled } +func (pc *PartitionContext) updateTryNodesThreadCount(conf configs.PartitionConfig) { + pc.tryNodesThreadCount = conf.TryNodesThreadCount +} + func (pc *PartitionContext) updatePartitionDetails(conf configs.PartitionConfig) error { // the following piece of code (before pc.Lock()) must be performed without locking // to avoid lock order differences between PartitionContext and AppPlacementManager @@ -185,6 +191,7 @@ func (pc *PartitionContext) updatePartitionDetails(conf configs.PartitionConfig) pc.Lock() defer pc.Unlock() pc.updatePreemption(conf) + pc.updateTryNodesThreadCount(conf) // start at the root: there is only one queue queueConf := conf.Queues[0] root := pc.root @@ -818,7 +825,7 @@ func (pc *PartitionContext) tryAllocate() *objects.AllocationResult { return nil } // try allocating from the root down - result := pc.root.TryAllocate(pc.GetNodeIterator, pc.GetFullNodeIterator, pc.GetNode, pc.IsPreemptionEnabled()) + result := pc.root.TryAllocate(pc.GetNodeIterator, pc.GetFullNodeIterator, pc.GetNode, pc.IsPreemptionEnabled(), pc.getTryNodesThreadCount()) if result != nil { return pc.allocate(result) } @@ -1647,6 +1654,12 @@ func (pc *PartitionContext) IsPreemptionEnabled() bool { return pc.preemptionEnabled } +func (pc *PartitionContext) getTryNodesThreadCount() int { + pc.RLock() + defer pc.RUnlock() + return pc.tryNodesThreadCount +} + func (pc *PartitionContext) moveTerminatedApp(appID string) { app := pc.getApplication(appID) // nothing to do if the app is not found on the partition