Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1715,3 +1715,31 @@ func (pc *PartitionContext) getReservationCount() int {
defer pc.RUnlock()
return pc.reservations
}

// GetOrderLog returns a snapshot of applications with pending requests grouped by queue
func (pc *PartitionContext) GetOrderLog() []*dao.OrderLogEntry {
pc.RLock()
defer pc.RUnlock()

// Build a map of queue -> applications with pending requests
queueAppMap := make(map[string][]string)

for _, app := range pc.applications {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map containing applications does not guarantee specific order. Since we are looking for scheduling order at any given moment, I would rather log into data structure which strictly maintains order based on the insertion order so that it can consumed as is in the state dump. I can see Queue.TryAllocate as the best place to push the application into the above mentioned slice.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, Good Catch. Not sure how I missed that. Let me fix it.

// Only include apps with pending resources
if resources.StrictlyGreaterThanZero(app.GetPendingResource()) {
queuePath := app.GetQueuePath()
queueAppMap[queuePath] = append(queueAppMap[queuePath], app.ApplicationID)
}
}

// Convert map to slice
result := make([]*dao.OrderLogEntry, 0, len(queueAppMap))
for queueName, appIDs := range queueAppMap {
result = append(result, &dao.OrderLogEntry{
QueueName: queueName,
ApplicationIDs: appIDs,
})
}

return result
}
154 changes: 154 additions & 0 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4957,3 +4957,157 @@ func TestApplicationBackoff(t *testing.T) {
assert.Assert(t, !deadline.IsZero())
assert.Assert(t, deadline.After(beforeAlloc))
}

func TestGetPartitionQueueDAOInfo(t *testing.T) {
conf := configs.PartitionConfig{
Name: "default",
Queues: []configs.QueueConfig{
{
Name: "root",
Parent: true,
SubmitACL: "*",
Queues: []configs.QueueConfig{
{
Name: "parent",
Parent: true,
Queues: []configs.QueueConfig{
{
Name: "leaf",
Parent: false,
},
},
},
},
},
},
}
partition, err := newPartitionContext(conf, "test-rm", nil, true)
assert.NilError(t, err, "partition create failed")

daoInfo := partition.GetPartitionQueues()

assert.Equal(t, "default", daoInfo.Partition)
assert.Equal(t, "root", daoInfo.QueueName)
assert.Equal(t, 1, len(daoInfo.Children))

parentDAO := daoInfo.Children[0]
assert.Equal(t, "root.parent", parentDAO.QueueName)
assert.Equal(t, 1, len(parentDAO.Children))

leafDAO := parentDAO.Children[0]
assert.Equal(t, "root.parent.leaf", leafDAO.QueueName)
assert.Equal(t, 0, len(leafDAO.Children))
}

func TestOrderLog(t *testing.T) {
// Create partition with multiple queues
conf := configs.PartitionConfig{
Name: "test",
Queues: []configs.QueueConfig{
{
Name: "root",
Parent: true,
SubmitACL: "*",
Queues: []configs.QueueConfig{
{Name: "default", Parent: false},
{Name: "production", Parent: false},
},
},
},
}
partition, err := newPartitionContext(conf, rmID, nil, false)
assert.NilError(t, err, "partition create failed")

// Empty partition should have empty order log
orderLog := partition.GetOrderLog()
assert.Equal(t, 0, len(orderLog), "initial order log should be empty")

// Add app1 to default queue with pending request
res := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 100, "vcore": 10})
app1 := newApplication(appID1, "default", "root.default")
err = partition.AddApplication(app1)
assert.NilError(t, err, "failed to add app1")
err = app1.AddAllocationAsk(newAllocationAsk("alloc-1", appID1, res))
assert.NilError(t, err, "failed to add ask to app1")

// Order log should show app1 in default queue
orderLog = partition.GetOrderLog()
assert.Equal(t, 1, len(orderLog), "should have 1 queue entry")
assert.Equal(t, "root.default", orderLog[0].QueueName)
assert.Equal(t, 1, len(orderLog[0].ApplicationIDs), "should have 1 app")
assert.Equal(t, appID1, orderLog[0].ApplicationIDs[0])

// Add app2 to same queue
app2 := newApplication(appID2, "default", "root.default")
err = partition.AddApplication(app2)
assert.NilError(t, err, "failed to add app2")
err = app2.AddAllocationAsk(newAllocationAsk("alloc-2", appID2, res))
assert.NilError(t, err, "failed to add ask to app2")

// Order log should show both apps in default queue
orderLog = partition.GetOrderLog()
assert.Equal(t, 1, len(orderLog), "should have 1 queue entry")
assert.Equal(t, 2, len(orderLog[0].ApplicationIDs), "should have 2 apps")
assert.Assert(t, contains(orderLog[0].ApplicationIDs, appID1), "should contain app1")
assert.Assert(t, contains(orderLog[0].ApplicationIDs, appID2), "should contain app2")

// Add app3 to production queue
app3 := newApplication(appID3, "production", "root.production")
err = partition.AddApplication(app3)
assert.NilError(t, err, "failed to add app3")
err = app3.AddAllocationAsk(newAllocationAsk("alloc-3", appID3, res))
assert.NilError(t, err, "failed to add ask to app3")

// Order log should show both queues
orderLog = partition.GetOrderLog()
assert.Equal(t, 2, len(orderLog), "should have 2 queue entries")
queueNames := make([]string, len(orderLog))
for i, entry := range orderLog {
queueNames[i] = entry.QueueName
}
assert.Assert(t, contains(queueNames, "root.default"), "should contain default queue")
assert.Assert(t, contains(queueNames, "root.production"), "should contain production queue")

// Remove allocation from app1 - it should disappear from order log
app1.RemoveAllocationAsk("alloc-1")
orderLog = partition.GetOrderLog()
assert.Equal(t, 2, len(orderLog), "should still have 2 queue entries")
for _, entry := range orderLog {
if entry.QueueName == defQueue {
assert.Equal(t, 1, len(entry.ApplicationIDs), "default queue should have 1 app")
assert.Equal(t, appID2, entry.ApplicationIDs[0], "should only have app2")
}
}

// Test snapshot immutability
snapshot1 := partition.GetOrderLog()
app4 := newApplication("app-4", "default", "root.default")
err = partition.AddApplication(app4)
assert.NilError(t, err, "failed to add app4")
err = app4.AddAllocationAsk(newAllocationAsk("alloc-4", "app-4", res))
assert.NilError(t, err, "failed to add ask to app4")
snapshot2 := partition.GetOrderLog()

// First snapshot should not be affected by new app
for _, entry := range snapshot1 {
if entry.QueueName == defQueue {
assert.Equal(t, 1, len(entry.ApplicationIDs), "snapshot1 should still have 1 app in default")
}
}
// Second snapshot should have the new app
for _, entry := range snapshot2 {
if entry.QueueName == defQueue {
assert.Equal(t, 2, len(entry.ApplicationIDs), "snapshot2 should have 2 apps in default")
}
}
}

// Helper function to check if a slice contains a string
func contains(slice []string, str string) bool {
for _, s := range slice {
if s == str {
return true
}
}
return false
}
25 changes: 25 additions & 0 deletions pkg/webservice/dao/order_log_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dao

// OrderLogEntry represents applications with pending requests in a queue
type OrderLogEntry struct {
QueueName string `json:"queueName"`
ApplicationIDs []string `json:"applicationIDs"` // Applications with pending allocation requests
}
11 changes: 11 additions & 0 deletions pkg/webservice/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,6 +1138,17 @@ func getPartitionQueuesDAO(lists map[string]*scheduler.PartitionContext) []dao.P
return result
}

func getOrderLogDAO(lists map[string]*scheduler.PartitionContext) map[string][]*dao.OrderLogEntry {
result := make(map[string][]*dao.OrderLogEntry)

for _, partition := range lists {
partitionName := common.GetPartitionNameWithoutClusterID(partition.Name)
result[partitionName] = partition.GetOrderLog()
}

return result
}

func getClusterDAO(lists map[string]*scheduler.PartitionContext) []*dao.ClusterDAOInfo {
result := make([]*dao.ClusterDAOInfo, 0, len(lists))

Expand Down
2 changes: 2 additions & 0 deletions pkg/webservice/state_dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type AggregatedStateInfo struct {
Config *dao.ConfigDAOInfo `json:"config,omitempty"`
PlacementRules []*dao.RuleDAOInfo `json:"placementRules,omitempty"`
EventStreams []events.EventStreamData `json:"eventStreams,omitempty"`
OrderLog map[string][]*dao.OrderLogEntry `json:"orderLog,omitempty"`
}

func getFullStateDump(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -82,6 +83,7 @@ func doStateDump(w io.Writer) error {
Config: getClusterConfigDAO(),
PlacementRules: getPlacementRulesDAO(partitionContext),
EventStreams: events.GetEventSystem().GetEventStreams(),
OrderLog: getOrderLogDAO(partitionContext),
}

var prettyJSON []byte
Expand Down