diff --git a/pkg/scheduler/partition.go b/pkg/scheduler/partition.go index 96c589436..72c875536 100644 --- a/pkg/scheduler/partition.go +++ b/pkg/scheduler/partition.go @@ -1715,3 +1715,31 @@ func (pc *PartitionContext) getReservationCount() int { defer pc.RUnlock() return pc.reservations } + +// GetOrderLog returns a snapshot of applications with pending requests grouped by queue +func (pc *PartitionContext) GetOrderLog() []*dao.OrderLogEntry { + pc.RLock() + defer pc.RUnlock() + + // Build a map of queue -> applications with pending requests + queueAppMap := make(map[string][]string) + + for _, app := range pc.applications { + // Only include apps with pending resources + if resources.StrictlyGreaterThanZero(app.GetPendingResource()) { + queuePath := app.GetQueuePath() + queueAppMap[queuePath] = append(queueAppMap[queuePath], app.ApplicationID) + } + } + + // Convert map to slice + result := make([]*dao.OrderLogEntry, 0, len(queueAppMap)) + for queueName, appIDs := range queueAppMap { + result = append(result, &dao.OrderLogEntry{ + QueueName: queueName, + ApplicationIDs: appIDs, + }) + } + + return result +} diff --git a/pkg/scheduler/partition_test.go b/pkg/scheduler/partition_test.go index 1ee9a4b3c..d5bec2a3b 100644 --- a/pkg/scheduler/partition_test.go +++ b/pkg/scheduler/partition_test.go @@ -4957,3 +4957,157 @@ func TestApplicationBackoff(t *testing.T) { assert.Assert(t, !deadline.IsZero()) assert.Assert(t, deadline.After(beforeAlloc)) } + +func TestGetPartitionQueueDAOInfo(t *testing.T) { + conf := configs.PartitionConfig{ + Name: "default", + Queues: []configs.QueueConfig{ + { + Name: "root", + Parent: true, + SubmitACL: "*", + Queues: []configs.QueueConfig{ + { + Name: "parent", + Parent: true, + Queues: []configs.QueueConfig{ + { + Name: "leaf", + Parent: false, + }, + }, + }, + }, + }, + }, + } + partition, err := newPartitionContext(conf, "test-rm", nil, true) + assert.NilError(t, err, "partition create failed") + + daoInfo := partition.GetPartitionQueues() + + assert.Equal(t, "default", daoInfo.Partition) + assert.Equal(t, "root", daoInfo.QueueName) + assert.Equal(t, 1, len(daoInfo.Children)) + + parentDAO := daoInfo.Children[0] + assert.Equal(t, "root.parent", parentDAO.QueueName) + assert.Equal(t, 1, len(parentDAO.Children)) + + leafDAO := parentDAO.Children[0] + assert.Equal(t, "root.parent.leaf", leafDAO.QueueName) + assert.Equal(t, 0, len(leafDAO.Children)) +} + +func TestOrderLog(t *testing.T) { + // Create partition with multiple queues + conf := configs.PartitionConfig{ + Name: "test", + Queues: []configs.QueueConfig{ + { + Name: "root", + Parent: true, + SubmitACL: "*", + Queues: []configs.QueueConfig{ + {Name: "default", Parent: false}, + {Name: "production", Parent: false}, + }, + }, + }, + } + partition, err := newPartitionContext(conf, rmID, nil, false) + assert.NilError(t, err, "partition create failed") + + // Empty partition should have empty order log + orderLog := partition.GetOrderLog() + assert.Equal(t, 0, len(orderLog), "initial order log should be empty") + + // Add app1 to default queue with pending request + res := resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 100, "vcore": 10}) + app1 := newApplication(appID1, "default", "root.default") + err = partition.AddApplication(app1) + assert.NilError(t, err, "failed to add app1") + err = app1.AddAllocationAsk(newAllocationAsk("alloc-1", appID1, res)) + assert.NilError(t, err, "failed to add ask to app1") + + // Order log should show app1 in default queue + orderLog = partition.GetOrderLog() + assert.Equal(t, 1, len(orderLog), "should have 1 queue entry") + assert.Equal(t, "root.default", orderLog[0].QueueName) + assert.Equal(t, 1, len(orderLog[0].ApplicationIDs), "should have 1 app") + assert.Equal(t, appID1, orderLog[0].ApplicationIDs[0]) + + // Add app2 to same queue + app2 := newApplication(appID2, "default", "root.default") + err = partition.AddApplication(app2) + assert.NilError(t, err, "failed to add app2") + err = app2.AddAllocationAsk(newAllocationAsk("alloc-2", appID2, res)) + assert.NilError(t, err, "failed to add ask to app2") + + // Order log should show both apps in default queue + orderLog = partition.GetOrderLog() + assert.Equal(t, 1, len(orderLog), "should have 1 queue entry") + assert.Equal(t, 2, len(orderLog[0].ApplicationIDs), "should have 2 apps") + assert.Assert(t, contains(orderLog[0].ApplicationIDs, appID1), "should contain app1") + assert.Assert(t, contains(orderLog[0].ApplicationIDs, appID2), "should contain app2") + + // Add app3 to production queue + app3 := newApplication(appID3, "production", "root.production") + err = partition.AddApplication(app3) + assert.NilError(t, err, "failed to add app3") + err = app3.AddAllocationAsk(newAllocationAsk("alloc-3", appID3, res)) + assert.NilError(t, err, "failed to add ask to app3") + + // Order log should show both queues + orderLog = partition.GetOrderLog() + assert.Equal(t, 2, len(orderLog), "should have 2 queue entries") + queueNames := make([]string, len(orderLog)) + for i, entry := range orderLog { + queueNames[i] = entry.QueueName + } + assert.Assert(t, contains(queueNames, "root.default"), "should contain default queue") + assert.Assert(t, contains(queueNames, "root.production"), "should contain production queue") + + // Remove allocation from app1 - it should disappear from order log + app1.RemoveAllocationAsk("alloc-1") + orderLog = partition.GetOrderLog() + assert.Equal(t, 2, len(orderLog), "should still have 2 queue entries") + for _, entry := range orderLog { + if entry.QueueName == defQueue { + assert.Equal(t, 1, len(entry.ApplicationIDs), "default queue should have 1 app") + assert.Equal(t, appID2, entry.ApplicationIDs[0], "should only have app2") + } + } + + // Test snapshot immutability + snapshot1 := partition.GetOrderLog() + app4 := newApplication("app-4", "default", "root.default") + err = partition.AddApplication(app4) + assert.NilError(t, err, "failed to add app4") + err = app4.AddAllocationAsk(newAllocationAsk("alloc-4", "app-4", res)) + assert.NilError(t, err, "failed to add ask to app4") + snapshot2 := partition.GetOrderLog() + + // First snapshot should not be affected by new app + for _, entry := range snapshot1 { + if entry.QueueName == defQueue { + assert.Equal(t, 1, len(entry.ApplicationIDs), "snapshot1 should still have 1 app in default") + } + } + // Second snapshot should have the new app + for _, entry := range snapshot2 { + if entry.QueueName == defQueue { + assert.Equal(t, 2, len(entry.ApplicationIDs), "snapshot2 should have 2 apps in default") + } + } +} + +// Helper function to check if a slice contains a string +func contains(slice []string, str string) bool { + for _, s := range slice { + if s == str { + return true + } + } + return false +} diff --git a/pkg/webservice/dao/order_log_info.go b/pkg/webservice/dao/order_log_info.go new file mode 100644 index 000000000..ad8067e62 --- /dev/null +++ b/pkg/webservice/dao/order_log_info.go @@ -0,0 +1,25 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package dao + +// OrderLogEntry represents applications with pending requests in a queue +type OrderLogEntry struct { + QueueName string `json:"queueName"` + ApplicationIDs []string `json:"applicationIDs"` // Applications with pending allocation requests +} diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go index a76f76c50..0c24ae258 100644 --- a/pkg/webservice/handlers.go +++ b/pkg/webservice/handlers.go @@ -1138,6 +1138,17 @@ func getPartitionQueuesDAO(lists map[string]*scheduler.PartitionContext) []dao.P return result } +func getOrderLogDAO(lists map[string]*scheduler.PartitionContext) map[string][]*dao.OrderLogEntry { + result := make(map[string][]*dao.OrderLogEntry) + + for _, partition := range lists { + partitionName := common.GetPartitionNameWithoutClusterID(partition.Name) + result[partitionName] = partition.GetOrderLog() + } + + return result +} + func getClusterDAO(lists map[string]*scheduler.PartitionContext) []*dao.ClusterDAOInfo { result := make([]*dao.ClusterDAOInfo, 0, len(lists)) diff --git a/pkg/webservice/state_dump.go b/pkg/webservice/state_dump.go index a37ef2cb4..71864edf6 100644 --- a/pkg/webservice/state_dump.go +++ b/pkg/webservice/state_dump.go @@ -51,6 +51,7 @@ type AggregatedStateInfo struct { Config *dao.ConfigDAOInfo `json:"config,omitempty"` PlacementRules []*dao.RuleDAOInfo `json:"placementRules,omitempty"` EventStreams []events.EventStreamData `json:"eventStreams,omitempty"` + OrderLog map[string][]*dao.OrderLogEntry `json:"orderLog,omitempty"` } func getFullStateDump(w http.ResponseWriter, r *http.Request) { @@ -82,6 +83,7 @@ func doStateDump(w io.Writer) error { Config: getClusterConfigDAO(), PlacementRules: getPlacementRulesDAO(partitionContext), EventStreams: events.GetEventSystem().GetEventStreams(), + OrderLog: getOrderLogDAO(partitionContext), } var prettyJSON []byte