diff --git a/test/unit/configs/test_config_naming.py b/test/unit/configs/test_config_naming.py index 0e9ec58fdc1..f82dc1dfc23 100644 --- a/test/unit/configs/test_config_naming.py +++ b/test/unit/configs/test_config_naming.py @@ -11,34 +11,30 @@ import pytest -class TestConfigNaming: - """Test Config parameter names with underscores and dashes.""" - - def test_flow_completes(self, config_naming_run): - """Test that the flow completes successfully.""" - assert config_naming_run.successful - assert config_naming_run.finished - - def test_config_with_underscore(self, config_naming_run): - """Test Config with underscore in name.""" - end_task = config_naming_run["end"].task - - assert end_task["underscore_test"].data == "underscore" - assert end_task["underscore_value"].data == 42 - assert end_task["underscore_dict"].data == {"test": "underscore", "value": 42} - - def test_config_with_dash(self, config_naming_run): - """Test Config with dash in name.""" - end_task = config_naming_run["end"].task - - assert end_task["dash_test"].data == "dash" - assert end_task["dash_value"].data == 99 - assert end_task["dash_dict"].data == {"test": "dash", "value": 99} - - def test_config_with_mixed_naming(self, config_naming_run): - """Test Config with both underscores and dashes in name.""" - end_task = config_naming_run["end"].task - - assert end_task["mixed_test"].data == "mixed" - assert end_task["mixed_value"].data == 123 - assert end_task["mixed_dict"].data == {"test": "mixed", "value": 123} +def test_flow_completes(config_naming_run): + """Test that the configuration parsing flow completes successfully.""" + assert config_naming_run.successful + assert config_naming_run.finished + + +@pytest.mark.parametrize( + "prefix, expected_text, expected_value", + [ + ("underscore", "underscore", 42), + ("dash", "dash", 99), + ("mixed", "mixed", 123), + ], + ids=["underscore_naming", "dash_naming", "mixed_naming"], +) +def test_config_parameter_naming_formats( + config_naming_run, prefix, expected_text, expected_value +): + """Test that configuration parameters parse correctly across different naming conventions.""" + end_task = config_naming_run["end"].task + + assert end_task[f"{prefix}_test"].data == expected_text + assert end_task[f"{prefix}_value"].data == expected_value + assert end_task[f"{prefix}_dict"].data == { + "test": expected_text, + "value": expected_value, + } diff --git a/test/unit/configs/test_config_plain.py b/test/unit/configs/test_config_plain.py index f9b95ba71b2..57c5edb547f 100644 --- a/test/unit/configs/test_config_plain.py +++ b/test/unit/configs/test_config_plain.py @@ -10,50 +10,43 @@ import pytest -class TestConfigPlain: - """Test Config with plain=True option.""" - - def test_flow_completes(self, config_plain_run): - """Test that the flow completes successfully.""" - assert config_plain_run.successful - assert config_plain_run.finished - - def test_plain_string_without_parser(self, config_plain_run): - """Test plain Config without parser returns raw string.""" - end_task = config_plain_run["end"].task - - # Verify it's a string - assert end_task["plain_str_type"].data == "str" - - # Verify the value is the raw string (not parsed JSON) - assert end_task["plain_str_value"].data == '{"raw": "string", "number": 123}' - - def test_plain_list_with_parser(self, config_plain_run): - """Test plain Config with parser returning list (non-dict).""" - end_task = config_plain_run["end"].task - - # Verify it's a list - assert end_task["plain_list_type"].data == "list" - - # Verify the list contents - assert end_task["plain_list_value"].data == [ - "apple", - "banana", - "cherry", - "date", - ] - assert end_task["plain_list_length"].data == 4 - assert end_task["plain_list_first"].data == "apple" - - def test_plain_tuple_with_parser(self, config_plain_run): - """Test plain Config with parser returning tuple (non-dict).""" - end_task = config_plain_run["end"].task - - # Verify it's a tuple type - assert end_task["plain_tuple_type"].data == "tuple" - - # Verify tuple contents - assert end_task["plain_tuple_value"].data == ("test_tuple", 42, True) - assert end_task["tuple_name"].data == "test_tuple" - assert end_task["tuple_count"].data == 42 - assert end_task["tuple_enabled"].data == True +def test_flow_completes(config_plain_run): + """Test that the configuration plain parsing flow completes successfully.""" + assert config_plain_run.successful + assert config_plain_run.finished + + +@pytest.mark.parametrize( + "key_prefix, expected_type, expected_value", + [ + ("plain_str", "str", '{"raw": "string", "number": 123}'), + ("plain_list", "list", ["apple", "banana", "cherry", "date"]), + ("plain_tuple", "tuple", ("test_tuple", 42, True)), + ], + ids=["raw_string", "parsed_list", "parsed_tuple"], +) +def test_plain_config_types_and_values( + config_plain_run, key_prefix, expected_type, expected_value +): + """Test that plain Config fields yield the expected type and exact raw or parsed values.""" + end_task = config_plain_run["end"].task + + assert end_task[f"{key_prefix}_type"].data == expected_type + assert end_task[f"{key_prefix}_value"].data == expected_value + + +def test_plain_list_properties(config_plain_run): + """Test detailed extraction and length properties of a plain parsed list.""" + end_task = config_plain_run["end"].task + + assert end_task["plain_list_length"].data == 4 + assert end_task["plain_list_first"].data == "apple" + + +def test_plain_tuple_properties(config_plain_run): + """Test detailed extraction and inner structures of a plain parsed tuple.""" + end_task = config_plain_run["end"].task + + assert end_task["tuple_name"].data == "test_tuple" + assert end_task["tuple_count"].data == 42 + assert end_task["tuple_enabled"].data is True diff --git a/test/unit/graph_inference/test_card_dag.py b/test/unit/graph_inference/test_card_dag.py index bdce054c24c..7c98fd19398 100644 --- a/test/unit/graph_inference/test_card_dag.py +++ b/test/unit/graph_inference/test_card_dag.py @@ -12,6 +12,7 @@ """ import json +import pytest from metaflow.plugins.cards.card_modules.basic import ( DefaultCardJSON, @@ -20,6 +21,7 @@ def _find_components_by_type(node, component_type): + """Recursively search for components of a specific type in a card JSON structure.""" if isinstance(node, dict): if node.get("type") == component_type: yield node @@ -31,12 +33,14 @@ def _find_components_by_type(node, component_type): # --------------------------------------------------------------------------- -# transform_flow_graph: shape-detection unit tests +# Fixtures # --------------------------------------------------------------------------- -def test_transform_flow_graph_supports_explicit_endpoints(): - graph = { +@pytest.fixture +def explicit_endpoints_graph(): + """Provides a fresh graph definition with custom explicit start/end steps.""" + return { "start_step": "begin", "end_step": "finish", "steps": { @@ -46,7 +50,23 @@ def test_transform_flow_graph_supports_explicit_endpoints(): }, } - transformed = transform_flow_graph(graph) + +@pytest.fixture +def legacy_graph(): + """Provides a fresh legacy graph definition relying on hardcoded keys.""" + return { + "start": {"type": "start", "next": ["end"], "doc": ""}, + "end": {"type": "end", "next": [], "doc": ""}, + } + + +# --------------------------------------------------------------------------- +# transform_flow_graph: shape-detection unit tests +# --------------------------------------------------------------------------- + + +def test_transform_flow_graph_supports_explicit_endpoints(explicit_endpoints_graph): + transformed = transform_flow_graph(explicit_endpoints_graph) assert transformed["start_step"] == "begin" assert transformed["end_step"] == "finish" @@ -56,13 +76,8 @@ def test_transform_flow_graph_supports_explicit_endpoints(): assert transformed["steps"]["finish"]["type"] == "end" -def test_transform_flow_graph_keeps_legacy_start_end_detection(): - graph = { - "start": {"type": "start", "next": ["end"], "doc": ""}, - "end": {"type": "end", "next": [], "doc": ""}, - } - - transformed = transform_flow_graph(graph) +def test_transform_flow_graph_keeps_legacy_start_end_detection(legacy_graph): + transformed = transform_flow_graph(legacy_graph) assert transformed["start_step"] == "start" assert transformed["end_step"] == "end" diff --git a/test/unit/graph_inference/test_graph_inference.py b/test/unit/graph_inference/test_graph_inference.py index f68d6833bfb..46dbe1ec7c1 100644 --- a/test/unit/graph_inference/test_graph_inference.py +++ b/test/unit/graph_inference/test_graph_inference.py @@ -9,29 +9,127 @@ - Single-step flows execute end-to-end """ +import pytest from metaflow.events import Trigger # --------------------------------------------------------------------------- -# Custom named flow (begin/middle/finish) +# Shared Shape Tests (Parametrized) # --------------------------------------------------------------------------- -def test_custom_named_flow_completes(custom_named_run): - assert custom_named_run.successful - assert custom_named_run.finished +@pytest.mark.parametrize( + "run_fixture", + [ + "custom_named_run", + "single_step_run", + "single_step_bare_run", + "custom_branch_run", + "single_step_with_config_run", + "single_step_with_stacked_decos_run", + "single_step_with_flow_mutator_run", + ], + ids=[ + "custom_named", + "single_step", + "single_step_bare", + "branch", + "config", + "stacked_decos", + "flow_mutator", + ], +) +def test_flow_completes_successfully(run_fixture, request): + """Verify that various flow configurations execute end-to-end.""" + run = request.getfixturevalue(run_fixture) + assert run.successful + assert run.finished + + +@pytest.mark.parametrize( + "run_fixture, expected_start, expected_end", + [ + ("custom_named_run", "begin", "finish"), + ("single_step_run", "only", "only"), + ("single_step_bare_run", "only", "only"), + ("custom_branch_run", "entry", "done"), + ], + ids=["custom_named", "single_step", "single_step_bare", "branch"], +) +def test_graph_info_endpoints(run_fixture, expected_start, expected_end, request): + """Verify _graph_info captures the correct explicit start/end steps.""" + run = request.getfixturevalue(run_fixture) + graph_info = run["_parameters"].task["_graph_info"].data + assert graph_info["start_step"] == expected_start + assert graph_info["end_step"] == expected_end + + +@pytest.mark.parametrize( + "run_fixture, expected_start, expected_end", + [ + ("custom_named_run", "begin", "finish"), + ("single_step_run", "only", "only"), + ("single_step_bare_run", "only", "only"), + ], + ids=["custom_named", "single_step", "single_step_bare"], +) +def test_parameters_metadata_endpoints( + run_fixture, expected_start, expected_end, request +): + """Verify metadata reflects the start/end step parameters correctly.""" + run = request.getfixturevalue(run_fixture) + meta = run["_parameters"].task.metadata_dict + assert meta.get("start_step") == expected_start + assert meta.get("end_step") == expected_end + + +@pytest.mark.parametrize( + "run_fixture, expected_val", + [ + ("custom_named_run", 3), + ("single_step_run", 42), + ("single_step_bare_run", 42), + ], + ids=["custom_named", "single_step", "single_step_bare"], +) +def test_end_task_data_value(run_fixture, expected_val, request): + """Verify the primary test artifact is correctly set in the terminal task.""" + run = request.getfixturevalue(run_fixture) + assert run.end_task is not None + assert run.end_task["x"].data == expected_val + + +@pytest.mark.parametrize( + "run_fixture, expected_steps", + [ + ("custom_named_run", {"begin", "middle", "finish"}), + ("single_step_run", {"only"}), + ("single_step_bare_run", {"only"}), + ("custom_branch_run", {"entry", "a", "b", "merge", "done"}), + ], + ids=["custom_named", "single_step", "single_step_bare", "branch"], +) +def test_steps_present(run_fixture, expected_steps, request): + """Verify the client API surfaces all executed step IDs correctly.""" + run = request.getfixturevalue(run_fixture) + assert {step.id for step in run} == expected_steps + + +@pytest.mark.parametrize( + "run_fixture", + ["single_step_run", "single_step_bare_run"], + ids=["single_step", "single_step_bare"], +) +def test_single_step_parent_child_empty(run_fixture, request): + """Single-step flows should have no parent or child relationships.""" + run = request.getfixturevalue(run_fixture) + assert list(run["only"].parent_steps) == [] + assert list(run["only"].child_steps) == [] -def test_custom_named_graph_info_has_endpoints(custom_named_run): - graph_info = custom_named_run["_parameters"].task["_graph_info"].data - assert graph_info["start_step"] == "begin" - assert graph_info["end_step"] == "finish" - - -def test_custom_named_parameters_metadata_has_endpoints(custom_named_run): - meta = custom_named_run["_parameters"].task.metadata_dict - assert meta.get("start_step") == "begin" - assert meta.get("end_step") == "finish" +# --------------------------------------------------------------------------- +# Flow-Specific Edge Cases & Composition +# --------------------------------------------------------------------------- def test_custom_named_graph_endpoints_property(custom_named_run): @@ -40,17 +138,6 @@ def test_custom_named_graph_endpoints_property(custom_named_run): assert end == "finish" -def test_custom_named_end_task(custom_named_run): - end_task = custom_named_run.end_task - assert end_task is not None - assert end_task["x"].data == 3 - - -def test_custom_named_steps_present(custom_named_run): - step_names = {step.id for step in custom_named_run} - assert step_names == {"begin", "middle", "finish"} - - def test_custom_named_parent_steps(custom_named_run): assert list(custom_named_run["begin"].parent_steps) == [] assert [step.id for step in custom_named_run["middle"].parent_steps] == ["begin"] @@ -63,98 +150,7 @@ def test_custom_named_child_steps(custom_named_run): assert list(custom_named_run["finish"].child_steps) == [] -# --------------------------------------------------------------------------- -# Single-step flow (start == end) -# --------------------------------------------------------------------------- - - -def test_single_step_flow_completes(single_step_run): - assert single_step_run.successful - assert single_step_run.finished - - -def test_single_step_graph_info_start_equals_end(single_step_run): - graph_info = single_step_run["_parameters"].task["_graph_info"].data - assert graph_info["start_step"] == "only" - assert graph_info["end_step"] == "only" - assert graph_info["start_step"] == graph_info["end_step"] - - -def test_single_step_parameters_metadata(single_step_run): - meta = single_step_run["_parameters"].task.metadata_dict - assert meta.get("start_step") == "only" - assert meta.get("end_step") == "only" - - -def test_single_step_end_task(single_step_run): - end_task = single_step_run.end_task - assert end_task is not None - assert end_task["x"].data == 42 - - -def test_single_step_present(single_step_run): - assert {step.id for step in single_step_run} == {"only"} - - -def test_single_step_parent_child_empty(single_step_run): - assert list(single_step_run["only"].parent_steps) == [] - assert list(single_step_run["only"].child_steps) == [] - - -# --------------------------------------------------------------------------- -# Single-step flow with bare @step (implicit start == end) -# --------------------------------------------------------------------------- - - -def test_single_step_bare_flow_completes(single_step_bare_run): - assert single_step_bare_run.successful - assert single_step_bare_run.finished - - -def test_single_step_bare_graph_info_start_equals_end(single_step_bare_run): - graph_info = single_step_bare_run["_parameters"].task["_graph_info"].data - assert graph_info["start_step"] == "only" - assert graph_info["end_step"] == "only" - - -def test_single_step_bare_parameters_metadata(single_step_bare_run): - meta = single_step_bare_run["_parameters"].task.metadata_dict - assert meta.get("start_step") == "only" - assert meta.get("end_step") == "only" - - -def test_single_step_bare_end_task(single_step_bare_run): - end_task = single_step_bare_run.end_task - assert end_task is not None - assert end_task["x"].data == 42 - - -def test_single_step_bare_step_present(single_step_bare_run): - assert {step.id for step in single_step_bare_run} == {"only"} - - -def test_single_step_bare_parent_child_empty(single_step_bare_run): - assert list(single_step_bare_run["only"].parent_steps) == [] - assert list(single_step_bare_run["only"].child_steps) == [] - - -# --------------------------------------------------------------------------- -# Custom branch flow (entry/a/b/merge/done) -# --------------------------------------------------------------------------- - - -def test_branch_flow_completes(custom_branch_run): - assert custom_branch_run.successful - assert custom_branch_run.finished - - -def test_branch_graph_info_endpoints(custom_branch_run): - graph_info = custom_branch_run["_parameters"].task["_graph_info"].data - assert graph_info["start_step"] == "entry" - assert graph_info["end_step"] == "done" - - -def test_branch_end_task(custom_branch_run): +def test_branch_end_task_exists(custom_branch_run): assert custom_branch_run.end_task is not None @@ -163,16 +159,6 @@ def test_branch_merge_data(custom_branch_run): assert sorted(merge_task["vals"].data) == ["a", "b"] -def test_branch_steps_present(custom_branch_run): - assert {step.id for step in custom_branch_run} == { - "entry", - "a", - "b", - "merge", - "done", - } - - def test_branch_entry_has_two_children(custom_branch_run): children = [step.id for step in custom_branch_run["entry"].child_steps] assert sorted(children) == ["a", "b"] @@ -184,7 +170,7 @@ def test_branch_merge_has_two_parents(custom_branch_run): # --------------------------------------------------------------------------- -# Trigger integration +# Trigger Integration # --------------------------------------------------------------------------- @@ -198,28 +184,16 @@ def test_trigger_from_runs_uses_custom_terminal_step(custom_named_run): # --------------------------------------------------------------------------- -# Composition: single-step flows with Config, stacked decorators, FlowMutator +# Composition Specific Tests # --------------------------------------------------------------------------- -def test_single_step_with_config_completes(single_step_with_config_run): - """Config-bearing single-step flow runs to completion.""" - assert single_step_with_config_run.successful - assert single_step_with_config_run.finished - - def test_single_step_with_config_value_flows_to_artifact(single_step_with_config_run): """Config descriptor value is readable from the end task's artifact.""" end_task = single_step_with_config_run.end_task assert end_task["v"].data == 7 -def test_single_step_with_stacked_decos_completes(single_step_with_stacked_decos_run): - """Single-step flow with stacked @retry/@resources runs end-to-end.""" - assert single_step_with_stacked_decos_run.successful - assert single_step_with_stacked_decos_run.finished - - def test_single_step_with_stacked_decos_graph_info(single_step_with_stacked_decos_run): """_graph_info records all stacked decorators on the only step.""" graph_info = ( @@ -229,12 +203,6 @@ def test_single_step_with_stacked_decos_graph_info(single_step_with_stacked_deco assert {"retry", "resources"}.issubset(names) -def test_single_step_with_flow_mutator_completes(single_step_with_flow_mutator_run): - """FlowMutator-decorated single-step flow runs end-to-end.""" - assert single_step_with_flow_mutator_run.successful - assert single_step_with_flow_mutator_run.finished - - def test_single_step_with_flow_mutator_applied(single_step_with_flow_mutator_run): """FlowMutator.add_decorator landed @retry on the only step.""" graph_info = ( diff --git a/test/unit/inheritance/test_inheritance.py b/test/unit/inheritance/test_inheritance.py index d10956bb2dc..9dc60b926a6 100644 --- a/test/unit/inheritance/test_inheritance.py +++ b/test/unit/inheritance/test_inheritance.py @@ -13,323 +13,354 @@ import pytest -class TestComprehensiveLinear: - """Test comprehensive linear inheritance: FlowSpec -> BaseA -> BaseB -> BaseC -> Flow""" +# --------------------------------------------------------------------------- +# Linear Inheritance Tests (FlowSpec -> BaseA -> BaseB -> BaseC -> Flow) +# --------------------------------------------------------------------------- - def test_flow_completes(self, comprehensive_linear_run): - """Test that the flow completes successfully""" - assert comprehensive_linear_run.successful - assert comprehensive_linear_run.finished - def test_all_parameters_accessible(self, comprehensive_linear_run): - """Test that parameters from all levels are accessible""" - end_task = comprehensive_linear_run["end"].task +def test_linear_all_parameters_accessible(comprehensive_linear_run): + """Test that parameters from all levels are accessible in linear inheritance.""" + end_task = comprehensive_linear_run["end"].task - # From BaseA - assert end_task["result_alpha"].data == 10 - assert end_task["result_beta"].data == 5 + # From BaseA + assert end_task["result_alpha"].data == 10 + assert end_task["result_beta"].data == 5 - # From BaseC - assert end_task["result_gamma"].data == 2.5 + # From BaseC + assert end_task["result_gamma"].data == 2.5 - # From final class - assert end_task["result_delta"].data == "final" + # From final class + assert end_task["result_delta"].data == "final" - def test_all_configs_accessible(self, comprehensive_linear_run): - """Test that configs from all levels are accessible""" - end_task = comprehensive_linear_run["end"].task - # From BaseB - config_b = end_task["result_config_b"].data - assert config_b["multiplier"] == 3 - assert config_b["offset"] == 100 +def test_linear_all_configs_accessible(comprehensive_linear_run): + """Test that configs from all levels are accessible in linear inheritance.""" + end_task = comprehensive_linear_run["end"].task - # From BaseC - config_c = end_task["result_config_c"].data - assert config_c["mode"] == "production" - assert config_c["debug"] is False + # From BaseB + config_b = end_task["result_config_b"].data + assert config_b["multiplier"] == 3 + assert config_b["offset"] == 100 - def test_computation_with_configs(self, comprehensive_linear_run): - """Test computation using inherited parameters and configs""" - end_task = comprehensive_linear_run["end"].task + # From BaseC + config_c = end_task["result_config_c"].data + assert config_c["mode"] == "production" + assert config_c["debug"] is False - # start_value = alpha + beta = 10 + 5 = 15 - # processed_value = start_value * multiplier + offset = 15 * 3 + 100 = 145 - assert end_task["result_final"].data == 145 +def test_linear_computation_with_configs(comprehensive_linear_run): + """Test computation using inherited parameters and configs.""" + end_task = comprehensive_linear_run["end"].task -class TestMutatorWithBaseConfig: - """Test FlowMutator using config from base class""" + # start_value = alpha + beta = 10 + 5 = 15 + # processed_value = start_value * multiplier + offset = 15 * 3 + 100 = 145 + assert end_task["result_final"].data == 145 - def test_flow_completes(self, mutator_with_base_config_run): - """Test that flow completes successfully""" - assert mutator_with_base_config_run.successful - assert mutator_with_base_config_run.finished - def test_base_parameters_accessible(self, mutator_with_base_config_run): - """Test that base parameters are accessible""" - start_task = mutator_with_base_config_run["start"].task +# --------------------------------------------------------------------------- +# FlowMutator with Base Class Config Tests +# --------------------------------------------------------------------------- - assert start_task["result_base_param"].data == "base" - assert start_task["result_middle_param"].data == 100 - assert start_task["result_final_param"].data == 50 - def test_base_config_accessible(self, mutator_with_base_config_run): - """Test that config from base class is accessible""" - start_task = mutator_with_base_config_run["start"].task +def test_mutator_base_config_parameters_accessible(mutator_with_base_config_run): + """Test that base parameters are accessible when using a base config mutator.""" + start_task = mutator_with_base_config_run["start"].task - config = start_task["result_mutator_config"].data - assert config["param_to_inject"] == "dynamic_param" - assert config["default_value"] == 777 - assert config["inject_count"] == 42 + assert start_task["result_base_param"].data == "base" + assert start_task["result_middle_param"].data == 100 + assert start_task["result_final_param"].data == 50 - def test_mutator_injects_from_base_config(self, mutator_with_base_config_run): - """Test that mutator injects parameters based on base config""" - start_task = mutator_with_base_config_run["start"].task - # These parameters should be injected by the mutator based on mutator_config - assert start_task["result_dynamic_param"].data == 777 - assert start_task["result_injected_count"].data == 42 +def test_mutator_base_config_accessible(mutator_with_base_config_run): + """Test that config from base class is accessible.""" + start_task = mutator_with_base_config_run["start"].task - def test_computation_with_injected_params(self, mutator_with_base_config_run): - """Test computation using injected parameters""" - start_task = mutator_with_base_config_run["start"].task + config = start_task["result_mutator_config"].data + assert config["param_to_inject"] == "dynamic_param" + assert config["default_value"] == 777 + assert config["inject_count"] == 42 - # result_computation = middle_param + dynamic_param + injected_count - # = 100 + 777 + 42 = 919 - assert start_task["result_computation"].data == 919 +def test_mutator_injects_from_base_config(mutator_with_base_config_run): + """Test that mutator injects parameters based on base config.""" + start_task = mutator_with_base_config_run["start"].task -class TestMutatorWithDerivedConfig: - """Test FlowMutator in base class using config from derived class""" + # These parameters should be injected by the mutator based on mutator_config + assert start_task["result_dynamic_param"].data == 777 + assert start_task["result_injected_count"].data == 42 - def test_flow_completes(self, mutator_with_derived_config_run): - """Test that flow completes successfully""" - assert mutator_with_derived_config_run.successful - assert mutator_with_derived_config_run.finished - def test_all_parameters_accessible(self, mutator_with_derived_config_run): - """Test that all parameters from hierarchy are accessible""" - start_task = mutator_with_derived_config_run["start"].task +def test_mutator_base_config_computation_with_injected_params( + mutator_with_base_config_run, +): + """Test computation using parameters injected from base config.""" + start_task = mutator_with_base_config_run["start"].task - assert start_task["result_base_param"].data == "base_value" - assert start_task["result_middle_param"].data == 200 - assert start_task["result_final_param"].data == 999 + # result_computation = middle_param + dynamic_param + injected_count + # = 100 + 777 + 42 = 919 + assert start_task["result_computation"].data == 919 - def test_all_configs_accessible(self, mutator_with_derived_config_run): - """Test that all configs from hierarchy are accessible""" - start_task = mutator_with_derived_config_run["start"].task - middle_config = start_task["result_middle_config"].data - assert middle_config["env"] == "staging" +# --------------------------------------------------------------------------- +# FlowMutator with Derived Class Config Tests +# --------------------------------------------------------------------------- - runtime_config = start_task["result_runtime_config"].data - assert runtime_config["features"] == ["logging", "metrics"] - assert runtime_config["worker_count"] == 16 - def test_base_mutator_uses_derived_config(self, mutator_with_derived_config_run): - """Test that base class mutator injects parameters from derived config""" - start_task = mutator_with_derived_config_run["start"].task +def test_mutator_derived_config_all_parameters_accessible( + mutator_with_derived_config_run, +): + """Test that all parameters from hierarchy are accessible when using derived config.""" + start_task = mutator_with_derived_config_run["start"].task - # These parameters should be injected by base mutator using derived runtime_config - assert start_task["result_feature_logging"].data is True - assert start_task["result_feature_metrics"].data is True - assert start_task["result_worker_count"].data == 16 + assert start_task["result_base_param"].data == "base_value" + assert start_task["result_middle_param"].data == 200 + assert start_task["result_final_param"].data == 999 - def test_computation_with_forward_injected_params( - self, mutator_with_derived_config_run - ): - """Test computation using parameters injected from derived config""" - start_task = mutator_with_derived_config_run["start"].task - # result_computation = worker_count * enabled_features + final_param - # enabled_features = feature_logging (True=1) + feature_metrics (True=1) = 2 - # = 16 * 2 + 999 = 1031 - assert start_task["result_computation"].data == 1031 +def test_mutator_derived_config_all_configs_accessible(mutator_with_derived_config_run): + """Test that all configs from hierarchy are accessible.""" + start_task = mutator_with_derived_config_run["start"].task + middle_config = start_task["result_middle_config"].data + assert middle_config["env"] == "staging" -class TestComprehensiveDiamond: - """Test comprehensive diamond inheritance pattern""" + runtime_config = start_task["result_runtime_config"].data + assert runtime_config["features"] == ["logging", "metrics"] + assert runtime_config["worker_count"] == 16 - def test_flow_completes(self, comprehensive_diamond_run): - """Test that diamond inheritance flow completes""" - assert comprehensive_diamond_run.successful - assert comprehensive_diamond_run.finished - def test_parameters_from_all_branches(self, comprehensive_diamond_run): - """Test parameters from all branches are accessible""" - end_task = comprehensive_diamond_run["end"].task +def test_base_mutator_uses_derived_config(mutator_with_derived_config_run): + """Test that base class mutator injects parameters from derived config.""" + start_task = mutator_with_derived_config_run["start"].task - # From BaseA branch - assert end_task["result_param_a"].data == 100 + # These parameters should be injected by base mutator using derived runtime_config + assert start_task["result_feature_logging"].data is True + assert start_task["result_feature_metrics"].data is True + assert start_task["result_worker_count"].data == 16 - # From BaseB branch - assert end_task["result_param_b"].data == 50 - # From BaseC (merge point) - assert end_task["result_param_c"].data == 25 +def test_computation_with_forward_injected_params(mutator_with_derived_config_run): + """Test computation using parameters injected from derived config.""" + start_task = mutator_with_derived_config_run["start"].task - # From final class - assert end_task["result_final_param"].data == "complete" + # result_computation = worker_count * enabled_features + final_param + # enabled_features = feature_logging (True=1) + feature_metrics (True=1) = 2 + # = 16 * 2 + 999 = 1031 + assert start_task["result_computation"].data == 1031 - def test_configs_from_all_branches(self, comprehensive_diamond_run): - """Test configs from all branches are accessible""" - end_task = comprehensive_diamond_run["end"].task - # From BaseA branch - config_a = end_task["result_config_a"].data - assert config_a["branch"] == "A" - assert config_a["priority"] == 1 +# --------------------------------------------------------------------------- +# Comprehensive Diamond Inheritance Tests +# --------------------------------------------------------------------------- - # From BaseB branch - config_b = end_task["result_config_b"].data - assert config_b["branch"] == "B" - assert config_b["weight"] == 2.5 - # From BaseC (merge point) - config_c = end_task["result_config_c"].data - assert config_c["mode"] == "diamond" - assert config_c["enabled"] is True - - def test_mro_resolution(self, comprehensive_diamond_run): - """Test that MRO correctly resolves diamond pattern""" - # If flow completes and uses correct step from BaseA, MRO is working - assert comprehensive_diamond_run.successful - assert "start" in [step.id for step in comprehensive_diamond_run.steps()] - assert "process" in [step.id for step in comprehensive_diamond_run.steps()] +def test_diamond_parameters_from_all_branches(comprehensive_diamond_run): + """Test parameters from all branches of diamond inheritance are accessible.""" + end_task = comprehensive_diamond_run["end"].task - def test_computation_across_branches(self, comprehensive_diamond_run): - """Test computation using values from all branches""" - end_task = comprehensive_diamond_run["end"].task - - # value_a = param_a * priority = 100 * 1 = 100 - # processed = value_a + (param_b * weight) + param_c - # = 100 + (50 * 2.5) + 25 = 100 + 125 + 25 = 250 - assert end_task["result_final"].data == 250 - - -class TestComprehensiveMultiHierarchy: - """Test comprehensive multiple inheritance from independent hierarchies""" - - def test_flow_completes(self, comprehensive_multi_hierarchy_run): - """Test that multi-hierarchy flow completes""" - assert comprehensive_multi_hierarchy_run.successful - assert comprehensive_multi_hierarchy_run.finished - - def test_parameters_from_first_hierarchy(self, comprehensive_multi_hierarchy_run): - """Test parameters from first hierarchy are accessible""" - end_task = comprehensive_multi_hierarchy_run["end"].task - - assert end_task["result_param_a"].data == 10 - assert end_task["result_param_b"].data == 20 - - def test_parameters_from_second_hierarchy(self, comprehensive_multi_hierarchy_run): - """Test parameters from second hierarchy are accessible""" - end_task = comprehensive_multi_hierarchy_run["end"].task - - assert end_task["result_param_x"].data == 30 - assert end_task["result_param_y"].data == 40 - - def test_merge_point_parameters(self, comprehensive_multi_hierarchy_run): - """Test parameters from merge point are accessible""" - end_task = comprehensive_multi_hierarchy_run["end"].task - - assert end_task["result_param_c"].data == 5 - assert end_task["result_final_param"].data == "merged" - - def test_configs_from_both_hierarchies(self, comprehensive_multi_hierarchy_run): - """Test configs from both hierarchies are accessible""" - end_task = comprehensive_multi_hierarchy_run["end"].task - - # First hierarchy - config_a = end_task["result_config_a"].data - assert config_a["source"] == "hierarchy_a" - assert config_a["value"] == 100 - - # Second hierarchy - config_x = end_task["result_config_x"].data - assert config_x["source"] == "hierarchy_x" - assert config_x["multiplier"] == 2 - - config_y = end_task["result_config_y"].data - assert config_y["enabled"] is True - assert config_y["threshold"] == 50 - - # Merge point - config_c = end_task["result_config_c"].data - assert config_c["merge"] is True - assert config_c["offset"] == 200 - - def test_step_override_from_merge_point(self, comprehensive_multi_hierarchy_run): - """Test that BaseC's process step overrides BaseY's process step""" - # If the computation matches BaseC's logic (not BaseY's), override worked - end_task = comprehensive_multi_hierarchy_run["end"].task - - # hierarchy_a_result = param_a + param_b + config_a.value = 10 + 20 + 100 = 130 - # base_value = hierarchy_a_result * multiplier = 130 * 2 = 260 - # Since base_value (260) > threshold (50): - # processed_value = base_value + offset + param_c = 260 + 200 + 5 = 465 - assert end_task["result_final"].data == 465 - - def test_cross_hierarchy_computation(self, comprehensive_multi_hierarchy_run): - """Test computation using values from both hierarchies""" - end_task = comprehensive_multi_hierarchy_run["end"].task - - # Cross-hierarchy sum = param_a + param_b + param_x + param_y + param_c - # = 10 + 20 + 30 + 40 + 5 = 105 - assert end_task["result_cross_hierarchy"].data == 105 - - def test_mutator_from_first_hierarchy_executes( - self, comprehensive_multi_hierarchy_run - ): - end_task = comprehensive_multi_hierarchy_run["end"].task - assert end_task["logging_param_count"].data == 6 - assert end_task["logging_config_count"].data == 4 - - def test_decorated_step_from_first_hierarchy( - self, comprehensive_multi_hierarchy_run - ): - """Test that decorated step from first hierarchy works""" - end_task = comprehensive_multi_hierarchy_run["end"].task - assert end_task["source_from_var"].data == "hierarchy_x" - - -# Integration tests -class TestInheritanceIntegration: - """Integration tests across different inheritance patterns""" - - @pytest.mark.parametrize( - "fixture_name", - [ - "comprehensive_linear_run", - "mutator_with_base_config_run", - "mutator_with_derived_config_run", - "comprehensive_diamond_run", - "comprehensive_multi_hierarchy_run", - ], - ) - def test_all_flows_complete_successfully(self, fixture_name, request): - """Test that all inheritance pattern flows complete successfully""" - run = request.getfixturevalue(fixture_name) - assert run.successful, f"{fixture_name} did not complete successfully" - assert run.finished, f"{fixture_name} did not finish" - - @pytest.mark.parametrize( - "fixture_name,expected_steps", - [ - ("comprehensive_linear_run", ["start", "process", "end"]), - ("mutator_with_base_config_run", ["start", "end"]), - ("mutator_with_derived_config_run", ["start", "end"]), - ("comprehensive_diamond_run", ["start", "process", "end"]), - ("comprehensive_multi_hierarchy_run", ["start", "process", "end"]), - ], - ) - def test_expected_steps_present(self, fixture_name, expected_steps, request): - """Test that all expected steps are present in each flow""" - run = request.getfixturevalue(fixture_name) - step_names = [step.id for step in run.steps()] - - for expected_step in expected_steps: - assert ( - expected_step in step_names - ), f"Step {expected_step} not found in {fixture_name}" + # From BaseA branch + assert end_task["result_param_a"].data == 100 + + # From BaseB branch + assert end_task["result_param_b"].data == 50 + + # From BaseC (merge point) + assert end_task["result_param_c"].data == 25 + + # From final class + assert end_task["result_final_param"].data == "complete" + + +def test_diamond_configs_from_all_branches(comprehensive_diamond_run): + """Test configs from all branches of diamond inheritance are accessible.""" + end_task = comprehensive_diamond_run["end"].task + + # From BaseA branch + config_a = end_task["result_config_a"].data + assert config_a["branch"] == "A" + assert config_a["priority"] == 1 + + # From BaseB branch + config_b = end_task["result_config_b"].data + assert config_b["branch"] == "B" + assert config_b["weight"] == 2.5 + + # From BaseC (merge point) + config_c = end_task["result_config_c"].data + assert config_c["mode"] == "diamond" + assert config_c["enabled"] is True + + +def test_diamond_mro_resolution(comprehensive_diamond_run): + """Test that MRO correctly resolves diamond pattern.""" + # If flow completes and uses correct step from BaseA, MRO is working + assert comprehensive_diamond_run.successful + step_ids = [step.id for step in comprehensive_diamond_run.steps()] + assert "start" in step_ids + assert "process" in step_ids + + +def test_diamond_computation_across_branches(comprehensive_diamond_run): + """Test computation using values from all branches of a diamond structure.""" + end_task = comprehensive_diamond_run["end"].task + + # value_a = param_a * priority = 100 * 1 = 100 + # processed = value_a + (param_b * weight) + param_c + # = 100 + (50 * 2.5) + 25 = 100 + 125 + 25 = 250 + assert end_task["result_final"].data == 250 + + +# --------------------------------------------------------------------------- +# Comprehensive Multiple Inheritance Tests (Independent Hierarchies) +# --------------------------------------------------------------------------- + + +def test_multi_hierarchy_parameters_from_first_hierarchy( + comprehensive_multi_hierarchy_run, +): + """Test parameters from first independent hierarchy are accessible.""" + end_task = comprehensive_multi_hierarchy_run["end"].task + + assert end_task["result_param_a"].data == 10 + assert end_task["result_param_b"].data == 20 + + +def test_multi_hierarchy_parameters_from_second_hierarchy( + comprehensive_multi_hierarchy_run, +): + """Test parameters from second independent hierarchy are accessible.""" + end_task = comprehensive_multi_hierarchy_run["end"].task + + assert end_task["result_param_x"].data == 30 + assert end_task["result_param_y"].data == 40 + + +def test_multi_hierarchy_merge_point_parameters(comprehensive_multi_hierarchy_run): + """Test parameters from hierarchy merge point are accessible.""" + end_task = comprehensive_multi_hierarchy_run["end"].task + + assert end_task["result_param_c"].data == 5 + assert end_task["result_final_param"].data == "merged" + + +def test_multi_hierarchy_configs_from_both_hierarchies( + comprehensive_multi_hierarchy_run, +): + """Test configs from both separate hierarchies are accessible.""" + end_task = comprehensive_multi_hierarchy_run["end"].task + + # First hierarchy + config_a = end_task["result_config_a"].data + assert config_a["source"] == "hierarchy_a" + assert config_a["value"] == 100 + + # Second hierarchy + config_x = end_task["result_config_x"].data + assert config_x["source"] == "hierarchy_x" + assert config_x["multiplier"] == 2 + + config_y = end_task["result_config_y"].data + assert config_y["enabled"] is True + assert config_y["threshold"] == 50 + + # Merge point + config_c = end_task["result_config_c"].data + assert config_c["merge"] is True + assert config_c["offset"] == 200 + + +def test_multi_hierarchy_step_override_from_merge_point( + comprehensive_multi_hierarchy_run, +): + """Test that BaseC's process step overrides BaseY's process step.""" + # If the computation matches BaseC's logic (not BaseY's), override worked + end_task = comprehensive_multi_hierarchy_run["end"].task + + # hierarchy_a_result = param_a + param_b + config_a.value = 10 + 20 + 100 = 130 + # base_value = hierarchy_a_result * multiplier = 130 * 2 = 260 + # Since base_value (260) > threshold (50): + # processed_value = base_value + offset + param_c = 260 + 200 + 5 = 465 + assert end_task["result_final"].data == 465 + + +def test_multi_hierarchy_cross_hierarchy_computation(comprehensive_multi_hierarchy_run): + """Test computation using values combined from both separate hierarchies.""" + end_task = comprehensive_multi_hierarchy_run["end"].task + + # Cross-hierarchy sum = param_a + param_b + param_x + param_y + param_c + # = 10 + 20 + 30 + 40 + 5 = 105 + assert end_task["result_cross_hierarchy"].data == 105 + + +def test_multi_hierarchy_mutator_from_first_hierarchy_executes( + comprehensive_multi_hierarchy_run, +): + """Verify mutator defined in first hierarchy applies context changes successfully.""" + end_task = comprehensive_multi_hierarchy_run["end"].task + assert end_task["logging_param_count"].data == 6 + assert end_task["logging_config_count"].data == 4 + + +def test_multi_hierarchy_decorated_step_from_first_hierarchy( + comprehensive_multi_hierarchy_run, +): + """Test that decorated step from first hierarchy functions correctly.""" + end_task = comprehensive_multi_hierarchy_run["end"].task + assert end_task["source_from_var"].data == "hierarchy_x" + + +# --------------------------------------------------------------------------- +# Integration Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "fixture_name", + [ + "comprehensive_linear_run", + "mutator_with_base_config_run", + "mutator_with_derived_config_run", + "comprehensive_diamond_run", + "comprehensive_multi_hierarchy_run", + ], + ids=[ + "linear_inheritance", + "mutator_base_config", + "mutator_derived_config", + "diamond_inheritance", + "multi_hierarchy", + ], +) +def test_all_flows_complete_successfully(fixture_name, request): + """Test that all structural inheritance pattern flows run completely and successfully.""" + run = request.getfixturevalue(fixture_name) + assert run.successful, f"{fixture_name} did not complete successfully" + assert run.finished, f"{fixture_name} did not finish" + + +@pytest.mark.parametrize( + "fixture_name, expected_steps", + [ + ("comprehensive_linear_run", ["start", "process", "end"]), + ("mutator_with_base_config_run", ["start", "end"]), + ("mutator_with_derived_config_run", ["start", "end"]), + ("comprehensive_diamond_run", ["start", "process", "end"]), + ("comprehensive_multi_hierarchy_run", ["start", "process", "end"]), + ], + ids=[ + "linear_steps", + "mutator_base_steps", + "mutator_derived_steps", + "diamond_steps", + "multi_hierarchy_steps", + ], +) +def test_expected_steps_present(fixture_name, expected_steps, request): + """Test that all expected DAG step structural markers are safely compiled into each run topology.""" + run = request.getfixturevalue(fixture_name) + step_names = [step.id for step in run.steps()] + + for expected_step in expected_steps: + assert ( + expected_step in step_names + ), f"Step {expected_step} not found in {fixture_name}" diff --git a/test/unit/mutators/test_add_decorator_returns.py b/test/unit/mutators/test_add_decorator_returns.py index c7143b4d1f5..92a57bc9b25 100644 --- a/test/unit/mutators/test_add_decorator_returns.py +++ b/test/unit/mutators/test_add_decorator_returns.py @@ -1,26 +1,40 @@ """Tests that MutableStep.add_decorator returns the decorator instance.""" +import pytest -class TestAddDecoratorReturns: - def test_flow_completes(self, add_decorator_return_run): - assert add_decorator_return_run.successful - def test_returned_decorator_is_not_none(self, add_decorator_return_run): - task = add_decorator_return_run["start"].task - assert task.data.returned_is_none is False +@pytest.fixture +def start_task_data(add_decorator_return_run): + """Shared setup: Extracts the data payload from the start task of the test run.""" + return add_decorator_return_run["start"].task.data - def test_returned_decorator_has_name(self, add_decorator_return_run): - task = add_decorator_return_run["start"].task - assert task.data.returned_has_name is True - def test_decorator_was_applied(self, add_decorator_return_run): - task = add_decorator_return_run["start"].task - assert task.data.added_var == "from_mutator" +def test_add_decorator_flow_completes_successfully(add_decorator_return_run): + """Test that the flow modified with add_decorator completes without errors.""" + assert add_decorator_return_run.successful - def test_duplicate_ignore_returns_none(self, add_decorator_return_run): - task = add_decorator_return_run["start"].task - assert task.data.duplicate_is_none is True - def test_duplicate_was_not_applied(self, add_decorator_return_run): - task = add_decorator_return_run["start"].task - assert task.data.should_not_exist is None +def test_add_decorator_returns_instance(start_task_data): + """Test that add_decorator returns an actual object, not None.""" + assert start_task_data.returned_is_none is False + + +def test_returned_decorator_instance_has_name_attribute(start_task_data): + """Test that the returned decorator instance has the expected properties.""" + assert start_task_data.returned_has_name is True + + +def test_added_decorator_executes_and_sets_data(start_task_data): + """Test that the dynamically added decorator was actually executed during the run.""" + # Verify that the variable injected by the dynamically added decorator exists + assert start_task_data.added_var == "from_mutator" + + +def test_adding_duplicate_decorator_with_ignore_returns_none(start_task_data): + """Test that attempting to add a duplicate decorator (when ignored) returns None.""" + assert start_task_data.duplicate_is_none is True + + +def test_ignored_duplicate_decorator_does_not_execute(start_task_data): + """Test that the ignored duplicate decorator does not apply its logic.""" + assert start_task_data.should_not_exist is None diff --git a/test/unit/mutators/test_dual_inheritance.py b/test/unit/mutators/test_dual_inheritance.py index ec7204fd778..0127a1cf487 100644 --- a/test/unit/mutators/test_dual_inheritance.py +++ b/test/unit/mutators/test_dual_inheritance.py @@ -1,27 +1,43 @@ """Tests for dual UserStepDecorator + StepMutator inheritance.""" +import pytest -class TestDualInheritance: - def test_flow_completes(self, dual_inherit_run): - assert dual_inherit_run.successful - - def test_pre_mutate_ran(self, dual_inherit_run): - """pre_mutate() should have added the environment variable.""" - task = dual_inherit_run["start"].task - assert task.data.pre_mutate_env_var == "pre_mutate_ran" - - def test_mutate_ran(self, dual_inherit_run): - """mutate() should have added the environment variable.""" - task = dual_inherit_run["start"].task - assert task.data.mutate_env_var == "hello" - - def test_pre_step_ran(self, dual_inherit_run): - """pre_step() should have set the artifact.""" - task = dual_inherit_run["start"].task - assert task.data.pre_step_ran is True - - def test_post_step_ran(self, dual_inherit_run): - """post_step() should have set the artifact on the start step, - visible in the end step via data propagation.""" - task = dual_inherit_run["end"].task - assert task.data.post_step_ran is True + +@pytest.fixture +def start_task_data(dual_inherit_run): + """Shared setup: Extracts the data payload from the start task of the test run.""" + return dual_inherit_run["start"].task.data + + +@pytest.fixture +def end_task_data(dual_inherit_run): + """Shared setup: Extracts the data payload from the end task of the test run.""" + return dual_inherit_run["end"].task.data + + +def test_dual_inheritance_flow_completes_successfully(dual_inherit_run): + """Test that a flow using a decorator with dual inheritance runs to completion.""" + assert dual_inherit_run.successful + + +def test_pre_mutate_hook_adds_environment_variable(start_task_data): + """Test that the pre_mutate() hook correctly injects its environment variable.""" + assert start_task_data.pre_mutate_env_var == "pre_mutate_ran" + + +def test_mutate_hook_adds_environment_variable(start_task_data): + """Test that the mutate() hook correctly injects its environment variable.""" + assert start_task_data.mutate_env_var == "hello" + + +def test_pre_step_hook_sets_artifact(start_task_data): + """Test that the pre_step() hook executes and successfully sets an artifact.""" + assert start_task_data.pre_step_ran is True + + +def test_post_step_hook_sets_artifact_visible_downstream(end_task_data): + """ + Test that post_step() sets an artifact on the start step, + and verifies it is visible in the end step via data propagation. + """ + assert end_task_data.post_step_ran is True diff --git a/test/unit/mutators/test_flow_mutator_addition.py b/test/unit/mutators/test_flow_mutator_addition.py index c7bafd7b042..0262e176af1 100644 --- a/test/unit/mutators/test_flow_mutator_addition.py +++ b/test/unit/mutators/test_flow_mutator_addition.py @@ -1,35 +1,40 @@ """Tests for dynamically adding FlowMutators via MutableFlow.add_decorator.""" +# --- Adding a FlowMutator by class reference --- -class TestDynamicFlowMutatorAddition: - """Adding a FlowMutator by class reference.""" - def test_flow_completes(self, dynamic_flow_mutator_run): - assert dynamic_flow_mutator_run.successful +def test_dynamic_flow_mutator_completes(dynamic_flow_mutator_run): + """Test that the flow completes successfully when adding a FlowMutator by class reference.""" + assert dynamic_flow_mutator_run.successful - def test_inner_pre_mutate_ran(self, dynamic_flow_mutator_run): - """InnerMutator.pre_mutate should have been called by the ongoing iteration.""" - task = dynamic_flow_mutator_run["start"].task - assert task.data.inner_pre == "inner_pre_mutate_ran" - def test_inner_mutate_ran(self, dynamic_flow_mutator_run): - """InnerMutator.mutate should also have been called.""" - task = dynamic_flow_mutator_run["start"].task - assert task.data.inner_mutate == "inner_mutate_ran" +def test_dynamic_flow_mutator_inner_pre_mutate_ran(dynamic_flow_mutator_run): + """Test InnerMutator.pre_mutate is called by the ongoing iteration.""" + task = dynamic_flow_mutator_run["start"].task + assert task.data.inner_pre == "inner_pre_mutate_ran" -class TestStringFlowMutatorAddition: - """Adding a FlowMutator by string name with arguments.""" +def test_dynamic_flow_mutator_inner_mutate_ran(dynamic_flow_mutator_run): + """Test InnerMutator.mutate is also called by the ongoing iteration.""" + task = dynamic_flow_mutator_run["start"].task + assert task.data.inner_mutate == "inner_mutate_ran" - def test_flow_completes(self, string_flow_mutator_run): - assert string_flow_mutator_run.successful - def test_string_mutator_pre_mutate_ran(self, string_flow_mutator_run): - """StringAddedMutator.pre_mutate should have run with the parsed arg.""" - task = string_flow_mutator_run["start"].task - assert task.data.string_tag == "from_string" +# --- Adding a FlowMutator by string name with arguments --- - def test_string_mutator_mutate_ran(self, string_flow_mutator_run): - """StringAddedMutator.mutate should also have been called.""" - task = string_flow_mutator_run["start"].task - assert task.data.string_mutate == "yes" + +def test_string_flow_mutator_completes(string_flow_mutator_run): + """Test that the flow completes successfully when adding a FlowMutator by string name.""" + assert string_flow_mutator_run.successful + + +def test_string_flow_mutator_pre_mutate_ran(string_flow_mutator_run): + """Test StringAddedMutator.pre_mutate runs with the parsed arg.""" + task = string_flow_mutator_run["start"].task + assert task.data.string_tag == "from_string" + + +def test_string_flow_mutator_mutate_ran(string_flow_mutator_run): + """Test StringAddedMutator.mutate is also called.""" + task = string_flow_mutator_run["start"].task + assert task.data.string_mutate == "yes" diff --git a/test/unit/mutators/test_post_step_none_false.py b/test/unit/mutators/test_post_step_none_false.py index e19eeb5f130..3f906734dea 100644 --- a/test/unit/mutators/test_post_step_none_false.py +++ b/test/unit/mutators/test_post_step_none_false.py @@ -1,18 +1,20 @@ """Regression test for post_step returning (None, False) being a no-op.""" -class TestPostStepNoneFalse: - def test_flow_completes(self, post_step_none_false_run): - """Run completes successfully rather than hitting RuntimeError at - task.py's `Invalid value passed to self.next` branch.""" - assert post_step_none_false_run.successful +def test_post_step_none_false_completes(post_step_none_false_run): + """Run completes successfully rather than hitting RuntimeError at + task.py's `Invalid value passed to self.next` branch.""" + assert post_step_none_false_run.successful - def test_pre_step_ran(self, post_step_none_false_run): - task = post_step_none_false_run["start"].task - assert task.data.pre_step_ran is True - def test_post_step_ran(self, post_step_none_false_run): - """post_step ran and its (None, False) return value was accepted as - a no-op (visible in the end step via data propagation).""" - task = post_step_none_false_run["end"].task - assert task.data.post_step_ran is True +def test_post_step_none_false_pre_step_ran(post_step_none_false_run): + """Verify that the pre_step was executed on the start task.""" + task = post_step_none_false_run["start"].task + assert task.data.pre_step_ran is True + + +def test_post_step_none_false_post_step_ran(post_step_none_false_run): + """post_step ran and its (None, False) return value was accepted as + a no-op (visible in the end step via data propagation).""" + task = post_step_none_false_run["end"].task + assert task.data.post_step_ran is True diff --git a/test/unit/mutators/test_remove_decorator_guard.py b/test/unit/mutators/test_remove_decorator_guard.py index 6cfcc2b2ec3..f888636e546 100644 --- a/test/unit/mutators/test_remove_decorator_guard.py +++ b/test/unit/mutators/test_remove_decorator_guard.py @@ -34,35 +34,42 @@ def end(self): pass -def _make_mutable_step(pre_mutate): - cls = FlowWithStepMutator - step_obj = getattr(cls, "start") - return MutableStep( - cls, - step_obj, - pre_mutate=pre_mutate, - statically_defined=True, - inserted_by=["test"], - ) - - -class TestRemoveDecoratorGuard: - def test_do_all_from_mutate_raises(self): - """Calling remove_decorator(name) without args/kwargs (do_all=True) - from a non-pre_mutate MutableStep must raise on a StepMutator.""" - ms = _make_mutable_step(pre_mutate=False) - with pytest.raises(MetaflowException, match="only allowed in the `pre_mutate`"): - ms.remove_decorator("dummy_step_mutator") - - def test_specific_match_from_mutate_raises(self): - """Same guard applies when an explicit deco_args/deco_kwargs match - is provided (this path already worked before the fix; keep covered).""" - ms = _make_mutable_step(pre_mutate=False) - with pytest.raises(MetaflowException, match="only allowed in the `pre_mutate`"): - ms.remove_decorator("dummy_step_mutator", deco_args=[], deco_kwargs={}) - - def test_do_all_from_pre_mutate_succeeds(self): - """From pre_mutate, removing the StepMutator via do_all should work.""" - ms = _make_mutable_step(pre_mutate=True) - removed = ms.remove_decorator("dummy_step_mutator") - assert removed is True +@pytest.fixture +def make_mutable_step(): + """Factory fixture to create MutableStep instances.""" + + def _make(pre_mutate): + cls = FlowWithStepMutator + step_obj = getattr(cls, "start") + return MutableStep( + cls, + step_obj, + pre_mutate=pre_mutate, + statically_defined=True, + inserted_by=["test"], + ) + + return _make + + +def test_remove_decorator_do_all_from_mutate_raises(make_mutable_step): + """Calling remove_decorator(name) without args/kwargs (do_all=True) + from a non-pre_mutate MutableStep must raise on a StepMutator.""" + ms = make_mutable_step(pre_mutate=False) + with pytest.raises(MetaflowException, match="only allowed in the `pre_mutate`"): + ms.remove_decorator("dummy_step_mutator") + + +def test_remove_decorator_specific_match_from_mutate_raises(make_mutable_step): + """Same guard applies when an explicit deco_args/deco_kwargs match + is provided (this path already worked before the fix; keep covered).""" + ms = make_mutable_step(pre_mutate=False) + with pytest.raises(MetaflowException, match="only allowed in the `pre_mutate`"): + ms.remove_decorator("dummy_step_mutator", deco_args=[], deco_kwargs={}) + + +def test_remove_decorator_do_all_from_pre_mutate_succeeds(make_mutable_step): + """From pre_mutate, removing the StepMutator via do_all should work.""" + ms = make_mutable_step(pre_mutate=True) + removed = ms.remove_decorator("dummy_step_mutator") + assert removed is True diff --git a/test/unit/mutators/test_string_step_mutator.py b/test/unit/mutators/test_string_step_mutator.py index ca205c59534..e5a65565f28 100644 --- a/test/unit/mutators/test_string_step_mutator.py +++ b/test/unit/mutators/test_string_step_mutator.py @@ -1,12 +1,13 @@ """Tests for string-based StepMutator addition via MutableStep.add_decorator.""" -class TestStringStepMutatorAddition: - def test_flow_completes(self, string_step_mutator_run): - assert string_step_mutator_run.successful - - def test_string_step_mutator_ran(self, string_step_mutator_run): - """StepMutator added by string should have its mutate() called, - adding the environment variable.""" - task = string_step_mutator_run["start"].task - assert task.data.step_mutator_val == "from_string" +def test_string_step_mutator_flow_completes(string_step_mutator_run): + """Test that the flow completes successfully when adding a StepMutator by string.""" + assert string_step_mutator_run.successful + + +def test_string_step_mutator_ran(string_step_mutator_run): + """StepMutator added by string should have its mutate() called, + adding the environment variable.""" + task = string_step_mutator_run["start"].task + assert task.data.step_mutator_val == "from_string" diff --git a/test/unit/spin/test_spin.py b/test/unit/spin/test_spin.py index 5ab193bb7de..d8364b72570 100644 --- a/test/unit/spin/test_spin.py +++ b/test/unit/spin/test_spin.py @@ -1,7 +1,13 @@ +import os +import tempfile import pytest + from metaflow import Runner -import os -from spin_test_helpers import assert_artifacts, run_step, FLOWS_DIR, ARTIFACTS_DIR +from spin_test_helpers import run_step, FLOWS_DIR, ARTIFACTS_DIR + +# --------------------------------------------------------------------------- +# Simple Flow Tests +# --------------------------------------------------------------------------- @pytest.mark.parametrize( @@ -14,67 +20,81 @@ ], ids=["merge_artifacts", "simple_config", "simple_parameter", "complex_dag"], ) -def test_simple_flows(flow_file, fixture_name, request): - """Test simple flows that just need artifact validation.""" +def test_simple_flows_validate_artifacts(flow_file, fixture_name, request): + """Test that basic flows run steps correctly and validate their artifacts.""" run = request.getfixturevalue(fixture_name) - print(f"Running test for {flow_file}: {run}") + + # Act & Assert: Iterate through and run each step for step in run.steps(): - print("-" * 100) if fixture_name == "complex_dag_run": run_step(flow_file, run, step.id, environment="conda") else: run_step(flow_file, run, step.id) -def test_artifacts_module(complex_dag_run): - print(f"Running test for artifacts module in ComplexDAGFlow: {complex_dag_run}") +# --------------------------------------------------------------------------- +# Artifacts Module Tests +# --------------------------------------------------------------------------- + + +def test_artifacts_module_evaluates_correctly(complex_dag_run): + """Test that an external artifacts module correctly injects state into a spun step.""" + # Setup step_name = "step_a" task = complex_dag_run[step_name].task flow_path = os.path.join(FLOWS_DIR, "complex_dag_flow.py") artifacts_path = os.path.join(ARTIFACTS_DIR, "complex_dag_step_a.py") + # Act with Runner(flow_path, cwd=FLOWS_DIR, environment="conda").spin( task.pathspec, artifacts_module=artifacts_path, persist=True, ) as spin: - print("-" * 50) - print(f"Running test for step: step_a with task pathspec: {task.pathspec}") + + # Assert spin_task = spin.task - print(f"my_output: {spin_task['my_output']}") assert spin_task["my_output"].data == [10, 11, 12, 3] -def test_artifacts_module_join_step( +def test_artifacts_module_injects_dynamic_data_in_join_step( complex_dag_run, complex_dag_step_d_artifacts, tmp_path ): - print(f"Running test for artifacts module in ComplexDAGFlow: {complex_dag_run}") + """Test that dynamically generated artifacts are correctly loaded during a join step.""" + # Setup step_name = "step_d" task = complex_dag_run[step_name].task flow_path = os.path.join(FLOWS_DIR, "complex_dag_flow.py") - # Create a temporary artifacts file with dynamic data + # Setup: Create a temporary artifacts file with dynamic data temp_artifacts_file = tmp_path / "temp_complex_dag_step_d.py" temp_artifacts_file.write_text(f"ARTIFACTS = {repr(complex_dag_step_d_artifacts)}") + # Act with Runner(flow_path, cwd=FLOWS_DIR, environment="conda").spin( task.pathspec, artifacts_module=str(temp_artifacts_file), persist=True, ) as spin: - print("-" * 50) - print(f"Running test for step: step_d with task pathspec: {task.pathspec}") + + # Assert spin_task = spin.task assert spin_task["my_output"].data == [-1] -def test_timeout_decorator_enforcement(simple_config_run): - """Test that timeout decorator properly enforces timeout limits.""" +# --------------------------------------------------------------------------- +# Decorator & Config Tests +# --------------------------------------------------------------------------- + + +def test_timeout_decorator_enforces_time_limit(simple_config_run): + """Test that a configured timeout decorator stops execution and raises an exception.""" + # Setup step_name = "start" task = simple_config_run[step_name].task flow_path = os.path.join(FLOWS_DIR, "simple_config_flow.py") - # With decorator enabled (should timeout and raise exception) + # Act & Assert with pytest.raises(Exception): with Runner( flow_path, cwd=FLOWS_DIR, config_value=[("config", {"timeout": 2})] @@ -85,13 +105,14 @@ def test_timeout_decorator_enforcement(simple_config_run): pass -def test_skip_decorators_bypass(simple_config_run): - """Test that skip_decorators successfully bypasses timeout decorator.""" +def test_skip_decorators_bypasses_timeout(simple_config_run): + """Test that using skip_decorators=True successfully ignores the timeout limit.""" + # Setup step_name = "start" task = simple_config_run[step_name].task flow_path = os.path.join(FLOWS_DIR, "simple_config_flow.py") - # With skip_decorators=True (should succeed despite timeout) + # Act with Runner( flow_path, cwd=FLOWS_DIR, config_value=[("config", {"timeout": 2})] ).spin( @@ -99,16 +120,18 @@ def test_skip_decorators_bypass(simple_config_run): skip_decorators=True, persist=True, ) as spin: - print(f"Running test for step: {step_name} with skip_decorators=True") - # Should complete successfully even though sleep(5) > timeout(2) - spin_task = spin.task - assert spin_task.finished + + # Assert: Should complete successfully even though step length > timeout + assert spin.task.finished def test_spin_preserves_explicit_top_level_decospecs(spin_decospec_run): + """Test that spin respects top-level decorator specifications provided to Runner.""" + # Setup task = spin_decospec_run["start"].task flow_path = os.path.join(FLOWS_DIR, "spin_decospec_flow.py") + # Act & Assert with pytest.raises(Exception, match="timed out"): with Runner( flow_path, @@ -123,10 +146,13 @@ def test_spin_preserves_explicit_top_level_decospecs(spin_decospec_run): pass -def test_spin_step_does_not_apply_default_decospecs(spin_decospec_run): +def test_spin_step_ignores_default_decospecs(spin_decospec_run): + """Test that spin does NOT inadvertently apply METAFLOW_DEFAULT_DECOSPECS.""" + # Setup task = spin_decospec_run["start"].task flow_path = os.path.join(FLOWS_DIR, "spin_decospec_flow.py") + # Act with Runner( flow_path, cwd=FLOWS_DIR, @@ -137,44 +163,59 @@ def test_spin_step_does_not_apply_default_decospecs(spin_decospec_run): task.pathspec, persist=True, ) as spin: + + # Assert assert spin.task.finished assert spin.task["done"].data is True -def test_hidden_artifacts(simple_parameter_run): - """Test simple flows that just need artifact validation.""" +# --------------------------------------------------------------------------- +# Internal State & Integration Tests +# --------------------------------------------------------------------------- + + +def test_spin_persists_internal_hidden_artifacts(simple_parameter_run): + """Test that spinning a task retains internal Metaflow graph and state artifacts.""" + # Setup step_name = "start" task = simple_parameter_run[step_name].task flow_path = os.path.join(FLOWS_DIR, "simple_parameter_flow.py") - print(f"Running test for hidden artifacts in {flow_path}: {simple_parameter_run}") + # Act with Runner(flow_path, cwd=FLOWS_DIR).spin(task.pathspec, persist=True) as spin: spin_task = spin.task + + # Assert assert "_graph_info" in spin_task assert "_foreach_stack" in spin_task -def test_card_flow(simple_card_run): - """Test a simple flow that has @card decorator.""" +def test_spin_generates_cards_correctly(simple_card_run): + """Test that spinning a flow with the @card decorator successfully outputs cards.""" + # Setup + from metaflow.cards import get_cards + step_name = "start" task = simple_card_run[step_name].task flow_path = os.path.join(FLOWS_DIR, "simple_card_flow.py") - print(f"Running test for cards in {flow_path}: {simple_card_run}") + # Act with Runner(flow_path, cwd=FLOWS_DIR).spin(task.pathspec, persist=True) as spin: - spin_task = spin.task - from metaflow.cards import get_cards + res = get_cards(spin.task, follow_resumed=False) - res = get_cards(spin_task, follow_resumed=False) - print(res) + # Assert + assert res is not None, "Cards collection should be generated and retrievable" + assert len(res) > 0, "Expected at least one card to be present in the results" -def test_spin_with_parameters_raises_error(simple_parameter_run): - """Test that passing flow parameters to spin raises an error.""" +def test_spin_with_flow_parameters_raises_error(simple_parameter_run): + """Test that passing standard flow parameters to spin() raises an Unknown argument error.""" + # Setup step_name = "start" task = simple_parameter_run[step_name].task flow_path = os.path.join(FLOWS_DIR, "simple_parameter_flow.py") + # Act & Assert with pytest.raises(Exception, match="Unknown argument"): with Runner(flow_path, cwd=FLOWS_DIR).spin( task.pathspec, @@ -184,41 +225,42 @@ def test_spin_with_parameters_raises_error(simple_parameter_run): pass -# NOTE: This test has to be the last test because it modifies the metadata -# provider when calling inspect_spin -def test_inspect_spin_client_access(simple_parameter_run): - """Test accessing spin artifacts using inspect_spin client directly.""" +# --------------------------------------------------------------------------- +# WARNING: State-Modifying Test +# This test modifies the global metadata provider via `inspect_spin`. +# It is kept at the bottom of the file to prevent side-effects on other tests. +# --------------------------------------------------------------------------- + + +def test_inspect_spin_client_allows_artifact_access(simple_parameter_run): + """Test accessing spun artifacts directly using the inspect_spin client.""" + # Setup from metaflow import inspect_spin, Task - import tempfile step_name = "start" task = simple_parameter_run[step_name].task flow_path = os.path.join(FLOWS_DIR, "simple_parameter_flow.py") with tempfile.TemporaryDirectory() as _: - # Run spin to generate artifacts + # Setup: Run spin to generate artifacts with Runner(flow_path, cwd=FLOWS_DIR).spin( task.pathspec, persist=True, ) as spin: spin_task = spin.task spin_pathspec = spin_task.pathspec + assert spin_task["a"] is not None assert spin_task["b"] is not None + assert spin_pathspec is not None - assert spin_pathspec is not None - - # Set metadata provider to spin + # Act: Set metadata provider to spin inspect_spin(FLOWS_DIR) client_task = Task(spin_pathspec, _namespace_check=False) - # Verify task is accessible + # Assert: Verify task and artifacts are accessible via client assert client_task is not None - - # Verify artifacts assert hasattr(client_task, "artifacts") - - # Verify artifact data assert client_task.artifacts.a.data == 10 assert client_task.artifacts.b.data == 20 assert client_task.artifacts.alpha.data == 0.05