diff --git a/dirsrvtests/tests/suites/schema/schema_update_policy_test.py b/dirsrvtests/tests/suites/schema/schema_update_policy_test.py index 16f80c38dd..16aed32025 100644 --- a/dirsrvtests/tests/suites/schema/schema_update_policy_test.py +++ b/dirsrvtests/tests/suites/schema/schema_update_policy_test.py @@ -63,27 +63,6 @@ def _add_oc(instance, oid, name): schema.add_objectclass(params) -@pytest.fixture -def temporary_oc2(topology_m2): - """Fixture to create and automatically clean up OC2UpdatePolicy.""" - supplier1 = topology_m2.ms["supplier1"] - supplier2 = topology_m2.ms["supplier2"] - name = "OC2UpdatePolicy" - - log.info(f"Adding temporary objectclass: {name}") - _add_oc(supplier1, "1.2.3.4.5.6.7.8.9.10.3", name) - - yield name - - log.info(f"Cleaning up temporary objectclass: {name}") - for s in [supplier1, supplier2]: - try: - schema = Schema(s) - schema.remove_objectclass(name) - except (ldap.NO_SUCH_OBJECT, ValueError): - pass - - @pytest.fixture(scope="function") def setup_test_env(request, topology_m2): """Initialize the test environment""" @@ -181,31 +160,34 @@ def test_schema_update_policy_allow(topology_m2, setup_test_env): assert entry.get_attr_val_utf8('description') == 'test_add' -def test_schema_update_policy_reject(topology_m2, setup_test_env, temporary_oc2): +def test_schema_update_policy_reject(topology_m2, setup_test_env): """Test that schema updates can be rejected based on policy :id: d9f4e3b5-6c8b-5b0e-0f3a-9b6c5d8e9g0 :setup: Two supplier replication setup with test entries :steps: - 1. Configure supplier1 to reject schema updates containing OC_NAME - 2. Add a new objectclass on supplier1 - 3. Update an entry to trigger schema push - 4. Verify schema was not pushed to supplier2 - 5. Remove reject policy - 6. Update entry again - 7. Verify schema is now pushed to supplier2 + 1. Configure supplier1 to reject schema updates containing OCUpdatePolicy + 2. Add OC2UpdatePolicy objectclass on supplier1 + 3. Verify OC2UpdatePolicy is NOT on supplier2 before triggering replication + 4. Update an entry to trigger schema push + 5. Verify schema was not pushed to supplier2 + 6. Remove reject policy + 7. Update entry again + 8. Verify schema is now pushed to supplier2 :expectedresults: 1. Policy should be configured 2. New objectclass should be added - 3. Update should trigger replication - 4. Schema should not be pushed due to policy - 5. Policy should be removed - 6. Update should trigger replication - 7. Schema should now be pushed + 3. OC2UpdatePolicy should not be on supplier2 + 4. Update should trigger replication + 5. Schema should not be pushed due to policy + 6. Policy should be removed + 7. Update should trigger replication + 8. Schema should now be pushed """ supplier1 = topology_m2.ms["supplier1"] supplier2 = topology_m2.ms["supplier2"] repl = ReplicationManager(DEFAULT_SUFFIX) + oc2_name = "OC2UpdatePolicy" log.info("Configure supplier to reject schema updates for OCUpdatePolicy") policy_dn = f"cn=supplierUpdatePolicy,cn=replSchema,{supplier1.config.dn}" @@ -214,10 +196,13 @@ def test_schema_update_policy_reject(topology_m2, setup_test_env, temporary_oc2) supplier1.restart() wait_for_attr_value(supplier1, policy_dn, 'schemaUpdateObjectclassReject', 'OCUpdatePolicy') - log.info("Verify OC2UpdatePolicy is in supplier1") + log.info(f"Add {oc2_name} objectclass on supplier1 (after reject policy is active)") + _add_oc(supplier1, "1.2.3.4.5.6.7.8.9.10.3", oc2_name) + + log.info(f"Verify {oc2_name} is in supplier1") schema = Schema(supplier1) schema_attrs = schema.get_objectclasses() - assert any('oc2updatepolicy' in (name.lower() for name in oc.names) for oc in schema_attrs) + assert any(oc2_name.lower() in (name.lower() for name in oc.names) for oc in schema_attrs) log.info("Update entry on supplier1 to trigger schema push") users = UserAccounts(supplier1, DEFAULT_SUFFIX) @@ -230,9 +215,9 @@ def test_schema_update_policy_reject(topology_m2, setup_test_env, temporary_oc2) entry = users2.get('test_entry') assert entry.get_attr_val_utf8('description') == 'test_reject' - log.info("Verify OC2UpdatePolicy was NOT pushed to supplier2") + log.info(f"Verify {oc2_name} was NOT pushed to supplier2") schema_attrs = supplier2.schema.get_objectclasses() - assert not any('oc2updatepolicy' in (name.lower() for name in oc.names) for oc in schema_attrs) + assert not any(oc2_name.lower() in (name.lower() for name in oc.names) for oc in schema_attrs) log.info("Remove reject policy") policy_entry.remove('schemaUpdateObjectclassReject', 'OCUpdatePolicy') @@ -247,9 +232,16 @@ def test_schema_update_policy_reject(topology_m2, setup_test_env, temporary_oc2) entry = users2.get('test_entry') assert entry.get_attr_val_utf8('description') == 'test_no_more_reject' - log.info("Verify OC2UpdatePolicy is now in supplier2") + log.info(f"Verify {oc2_name} is now in supplier2") schema_attrs = supplier2.schema.get_objectclasses() - assert any('oc2updatepolicy' in (name.lower() for name in oc.names) for oc in schema_attrs) + assert any(oc2_name.lower() in (name.lower() for name in oc.names) for oc in schema_attrs) + + log.info(f"Cleaning up {oc2_name}") + for s in [supplier1, supplier2]: + try: + Schema(s).remove_objectclass(oc2_name) + except (ldap.NO_SUCH_OBJECT, ValueError): + pass if __name__ == '__main__':