Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions pkg/scheduler/objects/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,36 @@ func (sq *Queue) applyTemplate(childTemplate *template.Template) {
// getProperties returns a copy of the properties for this queue
// Will never return a nil, can return an empty map.
func (sq *Queue) getProperties() map[string]string {
sq.Lock()
defer sq.Unlock()
props := make(map[string]string)
if sq == nil {
return props
}
sq.RLock()
defer sq.RUnlock()
for key, value := range sq.properties {
props[key] = value
}
return props
}

// GetProperties returns a copy of the properties for this queue.
// Will never return nil; can return an empty map.
func (sq *Queue) GetProperties() map[string]string {
return sq.getProperties()
}

// MergeParentProperties merges the parent queue properties with config properties into this queue.
// Config properties override parent properties. This should be called after ApplyConf during
// config reload to re-apply inherited properties from the parent.
// Lock protected.
func (sq *Queue) MergeParentProperties(config map[string]string) {
// get parent properties outside of the lock to avoid potential deadlocks with parent queue lock.
parentProps := sq.parent.getProperties()
sq.Lock()
defer sq.Unlock()
Comment thread
adityadtu5 marked this conversation as resolved.
sq.mergeProperties(parentProps, config)
}

// mergeProperties merges the properties from the parent queue and the config in the set from new queue
// lock free call
func (sq *Queue) mergeProperties(parent, config map[string]string) {
Expand Down
161 changes: 161 additions & 0 deletions pkg/scheduler/objects/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,167 @@ import (
)

// base test for creating a managed queue
func TestMergeParentPropertiesNilParent(t *testing.T) {
// root queue has no parent, MergeParentProperties should be a no-op
root, err := createRootQueue(nil)
assert.NilError(t, err, "root queue create failed")
root.properties = map[string]string{"key": "value"}

root.MergeParentProperties(map[string]string{"other": "other-value"})

// properties should remain unchanged since root has no parent to merge from
props := root.getProperties()
assert.Equal(t, "value", props["key"], "root queue properties should not be modified by MergeParentProperties")
Comment thread
adityadtu5 marked this conversation as resolved.
Outdated
_, exists := props["other"]
assert.Assert(t, !exists, "config props should not be applied when parent is nil")
}

func TestMergeParentPropertiesParentPropsInherited(t *testing.T) {
root, err := createRootQueue(nil)
assert.NilError(t, err, "root queue create failed")
parent, err := createManagedQueueWithProps(root, "parent", true, nil, map[string]string{"inherited-key": "inherited-value"})
assert.NilError(t, err, "parent queue create failed")
child, err := createManagedQueue(parent, "leaf", false, nil)
assert.NilError(t, err, "child queue create failed")

child.MergeParentProperties(nil)

props := child.getProperties()
assert.Equal(t, "inherited-value", props["inherited-key"], "child should inherit parent properties")
}

func TestMergeParentPropertiesConfigOverridesParent(t *testing.T) {
root, err := createRootQueue(nil)
assert.NilError(t, err, "root queue create failed")
parent, err := createManagedQueueWithProps(root, "parent", true, nil, map[string]string{"key": "parent-value"})
assert.NilError(t, err, "parent queue create failed")
child, err := createManagedQueue(parent, "leaf", false, nil)
assert.NilError(t, err, "child queue create failed")

child.MergeParentProperties(map[string]string{"key": "config-value", "extra": "extra-value"})

props := child.getProperties()
assert.Equal(t, "config-value", props["key"], "config property should override parent property")
assert.Equal(t, "extra-value", props["extra"], "config-only property should be present")
}

func TestMergeParentPropertiesClearsOldProperties(t *testing.T) {
root, err := createRootQueue(nil)
assert.NilError(t, err, "root queue create failed")
parent, err := createManagedQueue(root, "parent", true, nil)
assert.NilError(t, err, "parent queue create failed")
child, err := createManagedQueue(parent, "leaf", false, nil)
assert.NilError(t, err, "child queue create failed")

// manually set a stale property that should be wiped on next merge
child.Lock()
child.properties["stale-key"] = "stale-value"
child.Unlock()

child.MergeParentProperties(map[string]string{"new-key": "new-value"})

props := child.getProperties()
_, exists := props["stale-key"]
assert.Assert(t, !exists, "stale properties should be cleared on MergeParentProperties")
assert.Equal(t, "new-value", props["new-key"], "new config property should be present")
}

func TestMergeParentPropertiesFilterPriorityPolicy(t *testing.T) {
root, err := createRootQueue(nil)
assert.NilError(t, err, "root queue create failed")
parent, err := createManagedQueueWithProps(root, "parent", true, nil, map[string]string{
configs.PriorityPolicy: policies.FencePriorityPolicy.String(),
})
assert.NilError(t, err, "parent queue create failed")
child, err := createManagedQueue(parent, "leaf", false, nil)
assert.NilError(t, err, "child queue create failed")

child.MergeParentProperties(nil)

props := child.getProperties()
// priority.policy should not be inherited from parent; filterParentProperty resets it to default
assert.Equal(t, policies.DefaultPriorityPolicy.String(), props[configs.PriorityPolicy],
"priority.policy should be reset to default when inherited from parent")
}

func TestMergeParentPropertiesFilterPriorityOffset(t *testing.T) {
root, err := createRootQueue(nil)
assert.NilError(t, err, "root queue create failed")
parent, err := createManagedQueueWithProps(root, "parent", true, nil, map[string]string{
configs.PriorityOffset: "10",
})
assert.NilError(t, err, "parent queue create failed")
child, err := createManagedQueue(parent, "leaf", false, nil)
assert.NilError(t, err, "child queue create failed")

child.MergeParentProperties(nil)

props := child.getProperties()
// priority.offset should not be inherited; filterParentProperty resets it to "0"
assert.Equal(t, "0", props[configs.PriorityOffset],
"priority.offset should be reset to 0 when inherited from parent")
}

func TestMergeParentPropertiesFilterPreemptionPolicyDisabledPropagates(t *testing.T) {
root, err := createRootQueue(nil)
assert.NilError(t, err, "root queue create failed")
parent, err := createManagedQueueWithProps(root, "parent", true, nil, map[string]string{
configs.PreemptionPolicy: policies.DisabledPreemptionPolicy.String(),
})
assert.NilError(t, err, "parent queue create failed")
child, err := createManagedQueue(parent, "leaf", false, nil)
assert.NilError(t, err, "child queue create failed")

child.MergeParentProperties(nil)

props := child.getProperties()
// disabled preemption.policy is the only value that propagates from parent
assert.Equal(t, policies.DisabledPreemptionPolicy.String(), props[configs.PreemptionPolicy],
"disabled preemption.policy should propagate from parent")
}

func TestMergeParentPropertiesFilterPreemptionPolicyNonDisabledReset(t *testing.T) {
root, err := createRootQueue(nil)
assert.NilError(t, err, "root queue create failed")
parent, err := createManagedQueueWithProps(root, "parent", true, nil, map[string]string{
configs.PreemptionPolicy: policies.FencePreemptionPolicy.String(),
})
assert.NilError(t, err, "parent queue create failed")
child, err := createManagedQueue(parent, "leaf", false, nil)
assert.NilError(t, err, "child queue create failed")

child.MergeParentProperties(nil)

props := child.getProperties()
// non-disabled preemption.policy should not propagate from parent; reset to default
assert.Equal(t, policies.DefaultPreemptionPolicy.String(), props[configs.PreemptionPolicy],
"non-disabled preemption.policy should be reset to default when inherited from parent")
}

func TestMergeParentPropertiesConfigCanOverrideFilteredProps(t *testing.T) {
root, err := createRootQueue(nil)
assert.NilError(t, err, "root queue create failed")
parent, err := createManagedQueueWithProps(root, "parent", true, nil, map[string]string{
configs.PriorityPolicy: policies.FencePriorityPolicy.String(),
configs.PriorityOffset: "10",
})
assert.NilError(t, err, "parent queue create failed")
child, err := createManagedQueue(parent, "leaf", false, nil)
assert.NilError(t, err, "child queue create failed")

// config explicitly sets these, so they should not be overridden by the parent filter
child.MergeParentProperties(map[string]string{
configs.PriorityPolicy: policies.FencePriorityPolicy.String(),
configs.PriorityOffset: "5",
})

props := child.getProperties()
assert.Equal(t, policies.FencePriorityPolicy.String(), props[configs.PriorityPolicy],
"config priority.policy should override filtered parent value")
assert.Equal(t, "5", props[configs.PriorityOffset],
"config priority.offset should override filtered parent value")
}

func TestQueueBasics(t *testing.T) {
// create the root
root, err := createRootQueue(nil)
Expand Down
7 changes: 7 additions & 0 deletions pkg/scheduler/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ func (pc *PartitionContext) updateQueues(config []configs.QueueConfig, parent *o
queue, err = objects.NewConfiguredQueue(queueConfig, parent, false, pc.appQueueMapping)
} else {
oldMax, err = queue.ApplyConf(queueConfig)
if err == nil {
// Re-apply inherited properties from parent, mirroring the NewConfiguredQueue path.
// ApplyConf sets sq.properties to only the queue's own config properties, which
// drops any previously inherited values and prevents parent property changes from
// propagating to existing child queues on config reload.
queue.MergeParentProperties(queueConfig.Properties)
}
}
if err != nil {
return err
Expand Down
60 changes: 60 additions & 0 deletions pkg/scheduler/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,66 @@ func TestUpdateQueues(t *testing.T) {
assertUpdateQueues(t, "both", map[string]string{})
}

// TestUpdateQueuesInheritedProperties verifies that a child queue inherits
// properties from its parent on config reload (updateQueues path).
func TestUpdateQueuesInheritedProperties(t *testing.T) {
const customProp = "custom.test.property"

initialConf := []configs.QueueConfig{
{
Name: "parent",
Parent: true,
Properties: map[string]string{
customProp: "value1",
},
Queues: []configs.QueueConfig{
{
Name: "leaf",
Parent: false,
},
},
},
}

partition, err := newBasePartition()
assert.NilError(t, err, "partition create failed")
root := partition.GetQueue("root")
assert.Assert(t, root != nil, "root queue not found")

// initial load: leaf should inherit the property from parent
err = partition.updateQueues(initialConf, root)
assert.NilError(t, err, "initial updateQueues failed")

leaf := partition.GetQueue("root.parent.leaf")
assert.Assert(t, leaf != nil, "leaf queue should exist")
assert.Equal(t, leaf.GetProperties()[customProp], "value1",
"leaf should inherit property from parent on initial load")

// config reload: parent changes property value; leaf should pick up the new value
updatedConf := []configs.QueueConfig{
{
Name: "parent",
Parent: true,
Properties: map[string]string{
customProp: "value2",
},
Queues: []configs.QueueConfig{
{
Name: "leaf",
Parent: false,
},
},
},
}
err = partition.updateQueues(updatedConf, root)
assert.NilError(t, err, "config-reload updateQueues failed")

leaf = partition.GetQueue("root.parent.leaf")
assert.Assert(t, leaf != nil, "leaf queue should still exist after reload")
assert.Equal(t, leaf.GetProperties()[customProp], "value2",
"leaf should inherit updated property from parent on config reload")
}

func TestReAddQueues(t *testing.T) {
conf := []configs.QueueConfig{
{
Expand Down
Loading