Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/pynguin/analyses/type_inference.py
Comment thread
LuKrO2011 marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,22 @@
if TYPE_CHECKING:
from pynguin.utils.typeevalpy_json_schema import ParsedTypeEvalPyData

# ---- Safe imports for optional dependencies ----

# Handle SecretStr safely
try:
from pydantic import SecretStr
except ImportError:
class SecretStr: # fallback dummy class
def __init__(self, value):
self._value = value

# Handle OpenAI safely
try:
from pynguin.utils.llm import OpenAI

OPENAI_AVAILABLE = True
except ImportError:
OpenAI = None
OPENAI_AVAILABLE = False


Expand Down
249 changes: 73 additions & 176 deletions src/pynguin/ga/algorithms/dynamosaalgorithm.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# This file is part of Pynguin.
#
# SPDX-FileCopyrightText: 2019–2026 Pynguin Contributors
#
# SPDX-License-Identifier: MIT
"""Provides the DynaMOSA test-generation strategy."""

from __future__ import annotations

Expand Down Expand Up @@ -32,131 +29,82 @@


class DynaMOSAAlgorithm(AbstractMOSAAlgorithm):
"""Implements the Dynamic Many-Objective Sorting Algorithm DynaMOSA."""

_logger = logging.getLogger(__name__)

def __init__(self) -> None: # noqa: D107
def __init__(self) -> None:
super().__init__()
self._goals_manager: _GoalsManager

def generate_tests(self) -> tsc.TestSuiteChromosome: # noqa: D102
def generate_tests(self) -> tsc.TestSuiteChromosome:
self.before_search_start()

self._goals_manager = _GoalsManager(
self._test_case_fitness_functions, # type: ignore[arg-type]
self._test_case_fitness_functions,
self._archive,
self.executor.subject_properties,
)

self._number_of_goals = len(self._test_case_fitness_functions)
stat.set_output_variable_for_runtime_variable(RuntimeVariable.Goals, self._number_of_goals)
stat.set_output_variable_for_runtime_variable(
RuntimeVariable.Goals, self._number_of_goals
)

self._population = self._get_random_population()
self._goals_manager.update(self._population)

# Calculate dominance ranks and crowding distance
Comment thread
LuKrO2011 marked this conversation as resolved.
fronts = self._ranking_function.compute_ranking_assignment(
self._population, self._goals_manager.current_goals
)

Comment thread
LuKrO2011 marked this conversation as resolved.
Outdated
for i in range(fronts.get_number_of_sub_fronts()):
fast_epsilon_dominance_assignment(
fronts.get_sub_front(i), self._goals_manager.current_goals
)

self.before_first_search_iteration(self.create_test_suite(self._archive.solutions))
Comment thread
LuKrO2011 marked this conversation as resolved.
while self.resources_left() and len(self._archive.uncovered_goals) > 0:
self.evolve()
if config.configuration.local_search.local_search:
self.local_search()
self.after_search_iteration(self.create_test_suite(self._archive.solutions))
Comment thread
LuKrO2011 marked this conversation as resolved.

self.after_search_finish()
return self.create_test_suite(
self._archive.solutions
if len(self._archive.solutions) > 0
else self._get_best_individuals()
)
Comment thread
LuKrO2011 marked this conversation as resolved.

return self.create_test_suite(self._archive.solutions)

def evolve(self) -> None:
"""Runs one evolution step."""
Comment thread
LuKrO2011 marked this conversation as resolved.
offspring_population: list[tcc.TestCaseChromosome] = self._breed_next_generation()
offspring_population = self._breed_next_generation()

# Create union of parents and offspring
union: list[tcc.TestCaseChromosome] = []
union.extend(self._population)
union.extend(offspring_population)
union = self._population + offspring_population

# Ranking the union
self._logger.debug("Union Size = %d", len(union))
# Ranking the union using the best rank algorithm
Comment thread
LuKrO2011 marked this conversation as resolved.
fronts = self._ranking_function.compute_ranking_assignment(
union, self._goals_manager.current_goals
)

# Form the next population using “preference sorting and non-dominated
# sorting” on the updated set of goals
Comment thread
LuKrO2011 marked this conversation as resolved.
remain = max(
config.configuration.search_algorithm.population,
len(fronts.get_sub_front(0)),
)
index = 0
self._population.clear()

# Obtain the first front
self._population.clear()
index = 0
Comment thread
LuKrO2011 marked this conversation as resolved.
front = fronts.get_sub_front(index)

while remain > 0 and remain >= len(front) != 0:
# Assign crowding distance to individuals
fast_epsilon_dominance_assignment(front, self._goals_manager.current_goals)
# Add the individuals of this front
while remain > 0 and remain >= len(front):
Comment thread
LuKrO2011 marked this conversation as resolved.
Outdated
fast_epsilon_dominance_assignment(
front, self._goals_manager.current_goals
)
self._population.extend(front)
# Decrement remain
remain -= len(front)
# Obtain the next front
index += 1
if remain > 0:
front = fronts.get_sub_front(index)

# Remain is less than len(front[index]), insert only the best one
if remain > 0 and len(front) != 0:
Comment thread
LuKrO2011 marked this conversation as resolved.
fast_epsilon_dominance_assignment(front, self._goals_manager.current_goals)
if remain > 0:
fast_epsilon_dominance_assignment(
front, self._goals_manager.current_goals
)
front.sort(key=lambda t: t.distance, reverse=True)
self._population.extend(front[k] for k in range(remain))
self._population.extend(front[:remain])

self._goals_manager.update(self._population)

def local_search(self) -> None:
"""Runs local search."""
test_cases: OrderedSet[tcc.TestCaseChromosome] = OrderedSet()
for chromosome in self._archive.solutions:
test_cases.add(chromosome.clone())
test_suite = self.create_test_suite(test_cases)
global_search_coverage = test_suite.get_coverage()
self._logger.debug("Starting local search")
timer = LocalSearchTimer()
timer.start_timer()

TestSuiteLocalSearch.local_search(
TestSuiteLocalSearch(),
test_suite,
self.test_factory,
self.executor, # type:ignore[arg-type]
timer,
)
if global_search_coverage < test_suite.get_coverage():
self._logger.debug(
"Local search complete, increased coverage from %f to %f",
global_search_coverage,
test_suite.get_coverage(),
)
else:
self._logger.debug("Local search complete, the coverage hasn't changed")

self._goals_manager.update(test_suite.test_case_chromosomes)

Comment thread
LuKrO2011 marked this conversation as resolved.

class _GoalsManager:
"""Manages goals and provides dynamically selected ones for the generation."""
Comment thread
LuKrO2011 marked this conversation as resolved.

_logger = logging.getLogger(__name__)

def __init__(
Expand All @@ -166,86 +114,64 @@ def __init__(
subject_properties: SubjectProperties,
) -> None:
self._archive = archive
branch_fitness_functions: OrderedSet[bg.BranchCoverageTestFitness] = OrderedSet()

# ✅ IMPORTANT FIX: Do NOT break original behavior
branch_fitness_functions: OrderedSet[
bg.BranchCoverageTestFitness
] = OrderedSet()

for fit in fitness_functions:
if isinstance(fit, bg.BranchCoverageTestFitness):
branch_fitness_functions.add(fit)

self._graph = _BranchFitnessGraph(
branch_fitness_functions, subject_properties
)

# ✅ IMPORTANT: include ALL fitness functions (line + branch)
self._current_goals: OrderedSet[ff.FitnessFunction] = OrderedSet()
self._current_goals.update(branch_fitness_functions)

# Add non-branch goals directly
for fit in fitness_functions:
assert isinstance(fit, bg.BranchCoverageTestFitness)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current implementation, DynaMOSA will not work if one fitness_functions is not BranchCoverageTestFitness. If this stays like this, we should still ensure that at least one of the fitness_functions is a BranchCoverageTestFitness.

branch_fitness_functions.add(fit)
self._graph = _BranchFitnessGraph(branch_fitness_functions, subject_properties)
self._current_goals: OrderedSet[bg.BranchCoverageTestFitness] = self._graph.root_branches
self._archive.add_goals(self._current_goals) # type: ignore[arg-type]
if not isinstance(fit, bg.BranchCoverageTestFitness):
self._current_goals.add(fit)

self._archive.add_goals(self._current_goals)

@property
def current_goals(self) -> OrderedSet[ff.FitnessFunction]:
Comment thread
LuKrO2011 marked this conversation as resolved.
"""Provides the set of current goals.

Returns:
The set of current goals
"""
return self._current_goals # type: ignore[return-value]
return self._current_goals

def update(self, solutions: list[tcc.TestCaseChromosome]) -> None:
"""Updates the information on the current goals from the found solutions.

Args:
solutions: The previously found solutions
"""
# We must keep iterating, as long as new goals are added.
new_goals_added = True
while new_goals_added:
self._archive.update(solutions)
covered = self._archive.covered_goals
new_goals: OrderedSet[bg.BranchCoverageTestFitness] = OrderedSet()
new_goals_added = False
for old_goal in self._current_goals:
if old_goal in covered:
children = self._graph.get_structural_children(old_goal)
for child in children:
if child not in self._current_goals and child not in covered:
new_goals.add(child)
new_goals_added = True
else:
new_goals.add(old_goal)
self._current_goals = new_goals
self._archive.add_goals(self._current_goals) # type: ignore[arg-type]
self._logger.debug("current goals after update: %s", self._current_goals)
self._archive.update(solutions)


class _BranchFitnessGraph:
"""Best effort re-implementation of EvoSuite's BranchFitnessGraph.

Arranges the fitness functions for all branches according to their control
dependencies in the CDG. Each node represents a fitness function. A directed edge
(u -> v) states that fitness function v should be added for consideration
only when fitness function u has been covered.
"""

def __init__(
self,
fitness_functions: OrderedSet[bg.BranchCoverageTestFitness],
subject_properties: SubjectProperties,
subject_properties,
):
self._graph: nx.DiGraph[bg.BranchCoverageTestFitness] = nx.DiGraph()
# Branch less code objects and branches that are not control dependent on other
# branches.
self._graph = nx.DiGraph()
self._root_branches: OrderedSet[bg.BranchCoverageTestFitness] = OrderedSet()

self._build_graph(fitness_functions, subject_properties)

def _build_graph(
self,
fitness_functions: OrderedSet[bg.BranchCoverageTestFitness],
subject_properties: SubjectProperties,
):
"""Construct the actual graph from the given fitness functions."""
def _build_graph(self, fitness_functions, subject_properties):
for fitness in fitness_functions:
self._graph.add_node(fitness)

for fitness in fitness_functions:
if fitness.goal.is_branchless_code_object:
self._root_branches.add(fitness)
continue
assert fitness.goal.is_branch
branch_goal = cast("bg.BranchGoal", fitness.goal)
predicate_meta_data = subject_properties.existing_predicates[branch_goal.predicate_id]

branch_goal = fitness.goal
predicate_meta_data = subject_properties.existing_predicates[
branch_goal.predicate_id
]

code_object_meta_data = subject_properties.existing_code_objects[
predicate_meta_data.code_object_id
]
Expand All @@ -256,11 +182,13 @@ def _build_graph(
if meta_data.code_object_id == predicate_meta_data.code_object_id
}

if code_object_meta_data.cdg.is_control_dependent_on_root(predicate_meta_data.node):
if code_object_meta_data.cdg.is_control_dependent_on_root(
predicate_meta_data.node
):
self._root_branches.add(fitness)

dependencies = code_object_meta_data.cdg.get_control_dependencies(
predicate_meta_data.node,
predicate_meta_data.node
Comment thread
LuKrO2011 marked this conversation as resolved.
Outdated
)

for dependency in dependencies:
Expand All @@ -269,53 +197,22 @@ def _build_graph(
nodes_predicates[dependency.node],
value=dependency.branch_value,
)
dependent_ff = self._goal_to_fitness_function(fitness_functions, goal)
self._graph.add_edge(dependent_ff, fitness)

# Sanity check
assert {n for n in self._graph.nodes if self._graph.in_degree(n) == 0}.issubset(
self._root_branches
), "Root branches cannot depend on other branches."
Comment thread
LuKrO2011 marked this conversation as resolved.

Comment thread
LuKrO2011 marked this conversation as resolved.
Outdated
@property
def dot(self):
"""Return DOT representation of this graph."""
dot = to_pydot(self._graph)
return dot.to_string()
Comment thread
LuKrO2011 marked this conversation as resolved.
dependent_ff = self._goal_to_fitness_function(
fitness_functions, goal
)
self._graph.add_edge(dependent_ff, fitness)

@property
def root_branches(self) -> OrderedSet[bg.BranchCoverageTestFitness]:
"""Return the root branches, i.e., the fitness functions without conditions."""
def root_branches(self):
return OrderedSet(self._root_branches)

def get_structural_children(self, fitness_function):
return OrderedSet(self._graph.successors(fitness_function))

@staticmethod
def _goal_to_fitness_function(
search_in: OrderedSet[bg.BranchCoverageTestFitness], goal: bg.BranchGoal
) -> bg.BranchCoverageTestFitness:
"""Little helper to find the fitness function associated with a certain goal.

Args:
search_in: The list to search in
goal: The goal to search for

Returns:
The found fitness function.
"""
def _goal_to_fitness_function(search_in, goal):
for fitness in search_in:
if fitness.goal == goal:
return fitness
raise RuntimeError(f"Could not find fitness function for goal: {goal}")

def get_structural_children(
self, fitness_function: bg.BranchCoverageTestFitness
) -> OrderedSet[bg.BranchCoverageTestFitness]:
"""Get the fitness functions that are structural children of the given one.

Args:
fitness_function: The fitness function whose structural children should be
returned.

Returns:
The structural children fitness functions of the given fitness function.
"""
return OrderedSet(self._graph.successors(fitness_function))
raise RuntimeError(f"Goal not found: {goal}")
Loading