Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions pkg/common/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ type SchedulerConfig struct {
// - the preemption configuration for the partition
// - user group resolver type (os, ldap, "")
type PartitionConfig struct {
Name string
Queues []QueueConfig
PlacementRules []PlacementRule `yaml:",omitempty" json:",omitempty"`
Limits []Limit `yaml:",omitempty" json:",omitempty"`
Preemption PartitionPreemptionConfig `yaml:",omitempty" json:",omitempty"`
NodeSortPolicy NodeSortingPolicy `yaml:",omitempty" json:",omitempty"`
UserGroupResolver UserGroupResolver `yaml:",omitempty" json:",omitempty"`
Name string
Queues []QueueConfig
PlacementRules []PlacementRule `yaml:",omitempty" json:",omitempty"`
Limits []Limit `yaml:",omitempty" json:",omitempty"`
Preemption PartitionPreemptionConfig `yaml:",omitempty" json:",omitempty"`
NodeSortPolicy NodeSortingPolicy `yaml:",omitempty" json:",omitempty"`
UserGroupResolver UserGroupResolver `yaml:",omitempty" json:",omitempty"`
TryNodesThreadCount int `yaml:",omitempty" json:",omitempty"`
}

type UserGroupResolver struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/common/configs/configvalidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,11 @@ func Validate(newConfig *SchedulerConfig) error {
return fmt.Errorf("duplicate partition name found with name %s", partition.Name)
}
partitionMap[strings.ToLower(partition.Name)] = true

if partition.TryNodesThreadCount <= 0 {
partition.TryNodesThreadCount = 1
}

// check the queue structure
err := checkQueuesStructure(&partition)
if err != nil {
Expand Down
180 changes: 177 additions & 3 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,9 @@ func (sa *Application) canReplace(request *Allocation) bool {
}

// tryAllocate will perform a regular allocation of a pending request, includes placeholders.
func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption bool, preemptionDelay time.Duration, preemptAttemptsRemaining *int, nodeIterator func() NodeIterator, fullNodeIterator func() NodeIterator, getNodeFn func(string) *Node) *AllocationResult {
func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption bool, tryNodesThreadCount int,
preemptionDelay time.Duration, preemptAttemptsRemaining *int, nodeIterator func() NodeIterator,
fullNodeIterator func() NodeIterator, getNodeFn func(string) *Node) *AllocationResult {
sa.Lock()
defer sa.Unlock()
if sa.sortedRequests == nil {
Expand Down Expand Up @@ -1043,8 +1045,15 @@ func (sa *Application) tryAllocate(headRoom *resources.Resource, allowPreemption

iterator := nodeIterator()
if iterator != nil {
if result := sa.tryNodes(request, iterator); result != nil {
// have a candidate return it
var result *AllocationResult

if tryNodesThreadCount > 1 {
result = sa.tryNodesInParallel(request, iterator, tryNodesThreadCount)
} else {
result = sa.tryNodes(request, iterator)
}

if result != nil {
return result
}

Expand Down Expand Up @@ -1460,6 +1469,171 @@ func (sa *Application) tryNodesNoReserve(ask *Allocation, iterator NodeIterator,

// Try all the nodes for a request. The resultType is an allocation or reservation of a node.
// New allocations can only be reserved after a delay.
func (sa *Application) tryNodesInParallel(ask *Allocation, iterator NodeIterator, tryNodesThreadCount int) *AllocationResult { //nolint:funlen
var nodeToReserve *Node
scoreReserved := math.Inf(1)
allocKey := ask.GetAllocationKey()
reserved := sa.reservations[allocKey]
var allocResult *AllocationResult
var predicateErrors map[string]int

var mu sync.Mutex

// Channel to signal completion
done := make(chan struct{})
defer close(done)

// Function to process each batch
processBatch := func(batch []*Node) {
var wg sync.WaitGroup
semaphore := make(chan struct{}, tryNodesThreadCount)
candidateNodes := make([]*Node, len(batch))
errors := make([]error, len(batch))

for idx, node := range batch {
wg.Add(1)
semaphore <- struct{}{}
go func(idx int, node *Node) {
defer wg.Done()
defer func() { <-semaphore }()
dryRunResult, err := sa.tryNodeDryRun(node, ask)

mu.Lock()
defer mu.Unlock()
if err != nil {
errors[idx] = err
} else if dryRunResult != nil {
candidateNodes[idx] = node
}
}(idx, node)
}
Comment thread
pbacsko marked this conversation as resolved.

wg.Wait()

for _, err := range errors {
if err != nil {
mu.Lock()
if predicateErrors == nil {
predicateErrors = make(map[string]int)
}
predicateErrors[err.Error()]++
mu.Unlock()
}
}

// Process dry-run candidateNodes sequentially within the batch
for _, candidateNode := range candidateNodes {
if candidateNode == nil {
continue
}
tryNodeStart := time.Now()
result, err := sa.tryNode(candidateNode, ask)
if err != nil {
if predicateErrors == nil {
predicateErrors = make(map[string]int)
}
predicateErrors[err.Error()]++
} else if result != nil {
metrics.GetSchedulerMetrics().ObserveTryNodeLatency(tryNodeStart)
if reserved != nil {
if reserved.nodeID != candidateNode.NodeID {
log.Log(log.SchedApplication).Debug("allocate picking reserved alloc during non reserved allocate",
zap.String("appID", sa.ApplicationID),
zap.String("reserved nodeID", reserved.nodeID),
zap.String("allocationKey", allocKey))
result.ReservedNodeID = reserved.nodeID
} else {
log.Log(log.SchedApplication).Debug("allocate found reserved alloc during non reserved allocate",
zap.String("appID", sa.ApplicationID),
zap.String("nodeID", candidateNode.NodeID),
zap.String("allocationKey", allocKey))
}
result.ResultType = AllocatedReserved
allocResult = result
return
}
allocResult = result
return
}
askAge := time.Since(ask.GetCreateTime())
if reserved == nil && askAge > reservationDelay {
log.Log(log.SchedApplication).Debug("app reservation check",
zap.String("allocationKey", allocKey),
zap.Time("createTime", ask.GetCreateTime()),
zap.Duration("askAge", askAge),
zap.Duration("reservationDelay", reservationDelay))
score := candidateNode.GetFitInScoreForAvailableResource(ask.GetAllocatedResource())
if score < scoreReserved {
scoreReserved = score
nodeToReserve = candidateNode
}
}
}
}

// Iterate over nodes and process in batches
var batch []*Node
iterator.ForEachNode(func(node *Node) bool {
batch = append(batch, node)
if len(batch) >= tryNodesThreadCount {
processBatch(batch)
batch = nil
if allocResult != nil {
return false // Exit iteration if an allocation has been made
}
}
return true
})
// Process any remaining nodes in the last batch
if len(batch) > 0 && allocResult == nil {
processBatch(batch)
}

if allocResult != nil {
return allocResult
}

if predicateErrors != nil {
ask.SendPredicatesFailedEvent(predicateErrors)
}

if nodeToReserve != nil && !nodeToReserve.IsReserved() {
log.Log(log.SchedApplication).Debug("found candidate node for app reservation",
zap.String("appID", sa.ApplicationID),
zap.String("nodeID", nodeToReserve.NodeID),
zap.String("allocationKey", allocKey),
zap.Int("reservations", len(sa.reservations)))
if nodeToReserve.preReserveConditions(ask) != nil {
return nil
}
return newReservedAllocationResult(nodeToReserve.NodeID, ask)
}

return nil
}

func (sa *Application) tryNodeDryRun(node *Node, ask *Allocation) (*AllocationResult, error) {
toAllocate := ask.GetAllocatedResource()
allocationKey := ask.GetAllocationKey()

if !node.IsSchedulable() || !node.FitInNode(ask.GetAllocatedResource()) {
return nil, nil
}

// create the key for the reservation
if !node.preAllocateCheck(toAllocate, allocationKey) {
// skip schedule onto node
return nil, nil
}
// skip the node if conditions can not be satisfied
if err := node.preAllocateConditions(ask); err != nil {
return nil, err
}

result := newAllocatedAllocationResult(node.NodeID, ask)
return result, nil
}

func (sa *Application) tryNodes(ask *Allocation, iterator NodeIterator) *AllocationResult {
var nodeToReserve *Node
scoreReserved := math.Inf(1)
Expand Down
Loading