Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,22 @@ func (sq *Queue) getProperties() map[string]string {
return props
}

// GetProperties returns a copy of the properties for this queue.
// Will never return nil; can return an empty map.
func (sq *Queue) GetProperties() map[string]string {
return sq.getProperties()
}

// MergeProperties merges the parent queue properties with config properties into this queue.
// Config properties override parent properties. This should be called after ApplyConf during
// config reload to re-apply inherited properties from the parent.
// Lock protected.
func (sq *Queue) MergeProperties(parent, config map[string]string) {
Comment thread
adityadtu5 marked this conversation as resolved.
Outdated
sq.Lock()
defer sq.Unlock()
Comment thread
adityadtu5 marked this conversation as resolved.
sq.mergeProperties(parent, config)
}

// mergeProperties merges the properties from the parent queue and the config in the set from new queue
// lock free call
func (sq *Queue) mergeProperties(parent, config map[string]string) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ func (pc *PartitionContext) updateQueues(config []configs.QueueConfig, parent *o
queue, err = objects.NewConfiguredQueue(queueConfig, parent, false, pc.appQueueMapping)
} else {
oldMax, err = queue.ApplyConf(queueConfig)
if err == nil {
// Re-apply inherited properties from parent, mirroring the NewConfiguredQueue path.
// ApplyConf sets sq.properties to only the queue's own config properties, which
// drops any previously inherited values and prevents parent property changes from
// propagating to existing child queues on config reload.
queue.MergeProperties(parent.GetProperties(), queueConfig.Properties)
}
}
if err != nil {
return err
Expand Down
60 changes: 60 additions & 0 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,66 @@ func TestUpdateQueues(t *testing.T) {
assertUpdateQueues(t, "both", map[string]string{})
}

// TestUpdateQueuesInheritedProperties verifies that a child queue inherits
// properties from its parent on config reload (updateQueues path).
func TestUpdateQueuesInheritedProperties(t *testing.T) {
const customProp = "custom.test.property"

initialConf := []configs.QueueConfig{
{
Name: "parent",
Parent: true,
Properties: map[string]string{
customProp: "value1",
},
Queues: []configs.QueueConfig{
{
Name: "leaf",
Parent: false,
},
},
},
}

partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")
root := partition.GetQueue("root")
assert.Assert(t, root != nil, "root queue not found")

// initial load: leaf should inherit the property from parent
err = partition.updateQueues(initialConf, root)
assert.NilError(t, err, "initial updateQueues failed")

leaf := partition.GetQueue("root.parent.leaf")
assert.Assert(t, leaf != nil, "leaf queue should exist")
assert.Equal(t, leaf.GetProperties()[customProp], "value1",
"leaf should inherit property from parent on initial load")

// config reload: parent changes property value; leaf should pick up the new value
updatedConf := []configs.QueueConfig{
{
Name: "parent",
Parent: true,
Properties: map[string]string{
customProp: "value2",
},
Queues: []configs.QueueConfig{
{
Name: "leaf",
Parent: false,
},
},
},
}
err = partition.updateQueues(updatedConf, root)
assert.NilError(t, err, "config-reload updateQueues failed")

leaf = partition.GetQueue("root.parent.leaf")
assert.Assert(t, leaf != nil, "leaf queue should still exist after reload")
assert.Equal(t, leaf.GetProperties()[customProp], "value2",
"leaf should inherit updated property from parent on config reload")
}

func TestReAddQueues(t *testing.T) {
conf := []configs.QueueConfig{
{
Expand Down
Loading