diff --git a/opentcs-documentation/src/docs/release-notes/changelog.adoc b/opentcs-documentation/src/docs/release-notes/changelog.adoc index 90648fa56..dbb9416f2 100644 --- a/opentcs-documentation/src/docs/release-notes/changelog.adoc +++ b/opentcs-documentation/src/docs/release-notes/changelog.adoc @@ -26,6 +26,7 @@ This change log lists the most relevant changes for past releases in reverse chr * Bugs fixed: ** Properly check a new plant model for duplicate element names before accepting it. ** Don't allow transport orders to be created with a peripheral reservation token set to the empty string. +** Record allocation futures for retried tasks in `PendingAllocationManager` to ensure proper tracking and cancellation. * Changes affecting developers: ** Mark layout coordinates of points and locations for removal with the next major version as they are apparently not really used in practice. The model coordinates of points and locations should be used instead. diff --git a/opentcs-strategies-default/src/guiceConfig/java/org/opentcs/strategies/basic/scheduling/DefaultSchedulerModule.java b/opentcs-strategies-default/src/guiceConfig/java/org/opentcs/strategies/basic/scheduling/DefaultSchedulerModule.java index 0f61d732c..bd6d2cfa2 100644 --- a/opentcs-strategies-default/src/guiceConfig/java/org/opentcs/strategies/basic/scheduling/DefaultSchedulerModule.java +++ b/opentcs-strategies-default/src/guiceConfig/java/org/opentcs/strategies/basic/scheduling/DefaultSchedulerModule.java @@ -34,6 +34,7 @@ protected void configure() { private void configureSchedulerDependencies() { bind(ReservationPool.class).in(Singleton.class); + bind(PendingAllocationManager.class).in(Singleton.class); Multibinder moduleBinder = schedulerModuleBinder(); moduleBinder.addBinding().to(SingleVehicleBlockModule.class); diff --git a/opentcs-strategies-default/src/main/java/org/opentcs/strategies/basic/scheduling/AllocatorTask.java b/opentcs-strategies-default/src/main/java/org/opentcs/strategies/basic/scheduling/AllocatorTask.java index 4da5f74f7..87fad06a3 100644 --- a/opentcs-strategies-default/src/main/java/org/opentcs/strategies/basic/scheduling/AllocatorTask.java +++ b/opentcs-strategies-default/src/main/java/org/opentcs/strategies/basic/scheduling/AllocatorTask.java @@ -5,8 +5,8 @@ import static java.util.Objects.requireNonNull; import jakarta.annotation.Nonnull; -import java.util.Queue; import java.util.Set; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import org.opentcs.components.kernel.Scheduler; import org.opentcs.components.kernel.Scheduler.Client; @@ -35,9 +35,9 @@ class AllocatorTask */ private final Scheduler.Module allocationAdvisor; /** - * Allocations deferred because they couldn't be granted, yet. + * The pending allocation manager. */ - private final Queue deferredAllocations; + private final PendingAllocationManager pendingAllocationManager; /** * Executes tasks. */ @@ -58,7 +58,7 @@ class AllocatorTask @Nonnull ReservationPool reservationPool, @Nonnull - Queue deferredAllocations, + PendingAllocationManager allocationTracker, @Nonnull Scheduler.Module allocationAdvisor, @Nonnull @@ -70,7 +70,8 @@ class AllocatorTask AllocatorCommand command ) { this.reservationPool = requireNonNull(reservationPool, "reservationPool"); - this.deferredAllocations = requireNonNull(deferredAllocations, "deferredAllocations"); + this.pendingAllocationManager + = requireNonNull(allocationTracker, "pendingAllocationManager"); this.allocationAdvisor = requireNonNull(allocationAdvisor, "allocationAdvisor"); this.kernelExecutor = requireNonNull(kernelExecutor, "kernelExecutor"); this.globalSyncObject = requireNonNull(globalSyncObject, "globalSyncObject"); @@ -101,7 +102,7 @@ else if (command instanceof AllocatorCommand.AllocationsReleased) { private void processAllocate(AllocatorCommand.Allocate command) { if (!tryAllocate(command)) { LOG.debug("{}: Resources unavailable, deferring allocation...", command.getClient().getId()); - deferredAllocations.add(command); + pendingAllocationManager.addDeferredAllocation(command); return; } @@ -215,18 +216,18 @@ private void undoAllocate(Client client, Set> resources) { * Moves all waiting allocations back into the incoming queue so they can be rechecked. */ private void scheduleRetryWaitingAllocations() { - for (AllocatorCommand.Allocate allocate : deferredAllocations) { - kernelExecutor.submit( + for (AllocatorCommand.Allocate allocate : pendingAllocationManager.drainDeferredAllocations()) { + Future future = kernelExecutor.submit( new AllocatorTask( reservationPool, - deferredAllocations, + pendingAllocationManager, allocationAdvisor, kernelExecutor, globalSyncObject, allocate ) ); + pendingAllocationManager.addAllocationFuture(allocate.getClient(), future); } - deferredAllocations.clear(); } } diff --git a/opentcs-strategies-default/src/main/java/org/opentcs/strategies/basic/scheduling/DefaultScheduler.java b/opentcs-strategies-default/src/main/java/org/opentcs/strategies/basic/scheduling/DefaultScheduler.java index ddbb33d0f..73285890f 100644 --- a/opentcs-strategies-default/src/main/java/org/opentcs/strategies/basic/scheduling/DefaultScheduler.java +++ b/opentcs-strategies-default/src/main/java/org/opentcs/strategies/basic/scheduling/DefaultScheduler.java @@ -7,15 +7,11 @@ import jakarta.annotation.Nonnull; import jakarta.inject.Inject; -import java.util.ArrayList; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; import org.opentcs.components.kernel.ResourceAllocationException; @@ -58,9 +54,9 @@ public class DefaultScheduler */ private final ReservationPool reservationPool; /** - * Allocations deferred because they couldn't be granted, yet. + * The pending allocation manager. */ - private final Queue deferredAllocations = new LinkedBlockingQueue<>(); + private final PendingAllocationManager pendingAllocationManager; /** * Executes scheduling tasks. */ @@ -73,10 +69,6 @@ public class DefaultScheduler * A global object to be used for synchronization within the kernel. */ private final Object globalSyncObject; - /** - * Allocations that are scheduled for execution on the kernel executor. - */ - private final Map>> allocateFutures = new HashMap<>(); /** * Indicates whether this component is enabled. */ @@ -95,6 +87,7 @@ public class DefaultScheduler public DefaultScheduler( AllocationAdvisor allocationAdvisor, ReservationPool reservationPool, + PendingAllocationManager allocationTracker, @KernelExecutor ScheduledExecutorService kernelExecutor, @ApplicationEventBus @@ -104,6 +97,8 @@ public DefaultScheduler( ) { this.allocationAdvisor = requireNonNull(allocationAdvisor, "allocationAdvisor"); this.reservationPool = requireNonNull(reservationPool, "reservationPool"); + this.pendingAllocationManager + = requireNonNull(allocationTracker, "pendingAllocationManager"); this.kernelExecutor = requireNonNull(kernelExecutor, "kernelExecutor"); this.eventBus = requireNonNull(eventBus, "eventBus"); this.globalSyncObject = requireNonNull(globalSyncObject, "globalSyncObject"); @@ -117,6 +112,7 @@ public void initialize() { reservationPool.clear(); allocationAdvisor.initialize(); + pendingAllocationManager.initialize(); eventBus.subscribe(this); @@ -137,6 +133,7 @@ public void terminate() { eventBus.unsubscribe(this); allocationAdvisor.terminate(); + pendingAllocationManager.terminate(); initialized = false; } @@ -172,7 +169,7 @@ public void allocate(Client client, Set> resources) { Future allocateFuture = kernelExecutor.submit( new AllocatorTask( reservationPool, - deferredAllocations, + pendingAllocationManager, allocationAdvisor, kernelExecutor, globalSyncObject, @@ -181,12 +178,7 @@ public void allocate(Client client, Set> resources) { ); // Remember the allocate future in case we need to cancel it. - addAllocateFuture(client, allocateFuture); - - // Clean up the collection of allocate futures and remove futures that have already been - // completed. This could also be done in other places, but doing it for every new allocation - // should be sufficient. - removeCompletedAllocateFutures(client); + pendingAllocationManager.addAllocationFuture(client, allocateFuture); } } @@ -240,7 +232,7 @@ public void free(Client client, Set> resources) { .collect(Collectors.toCollection(HashSet::new)); new AllocatorTask( reservationPool, - deferredAllocations, + pendingAllocationManager, allocationAdvisor, kernelExecutor, globalSyncObject, @@ -250,7 +242,7 @@ public void free(Client client, Set> resources) { kernelExecutor.submit( new AllocatorTask( reservationPool, - deferredAllocations, + pendingAllocationManager, allocationAdvisor, kernelExecutor, globalSyncObject, @@ -272,7 +264,7 @@ public void freeAll(Client client) { new AllocatorTask( reservationPool, - deferredAllocations, + pendingAllocationManager, allocationAdvisor, kernelExecutor, globalSyncObject, @@ -282,7 +274,7 @@ public void freeAll(Client client) { kernelExecutor.submit( new AllocatorTask( reservationPool, - deferredAllocations, + pendingAllocationManager, allocationAdvisor, kernelExecutor, globalSyncObject, @@ -296,8 +288,7 @@ public void clearPendingAllocations(Client client) { requireNonNull(client, "client"); synchronized (globalSyncObject) { LOG.debug("{}: Clearing pending allocation requests...", client.getId()); - deferredAllocations.removeIf(allocate -> client.equals(allocate.getClient())); - cancelPendingAllocateFutures(client); + pendingAllocationManager.clearPendingAllocations(client); } } @@ -305,7 +296,7 @@ public void clearPendingAllocations(Client client) { public void reschedule() { new AllocatorTask( reservationPool, - deferredAllocations, + pendingAllocationManager, allocationAdvisor, kernelExecutor, globalSyncObject, @@ -335,7 +326,7 @@ public void preparationSuccessful( new AllocatorTask( reservationPool, - deferredAllocations, + pendingAllocationManager, allocationAdvisor, kernelExecutor, globalSyncObject, @@ -363,36 +354,6 @@ public void onEvent(Object event) { } } - private void addAllocateFuture(Client client, Future allocateFuture) { - if (!allocateFutures.containsKey(client)) { - allocateFutures.put(client, new ArrayList<>()); - } - - allocateFutures.get(client).add(allocateFuture); - } - - private void removeCompletedAllocateFutures(Client client) { - if (!allocateFutures.containsKey(client)) { - return; - } - - allocateFutures.get(client).removeAll( - allocateFutures.get(client).stream() - .filter(future -> future.isDone()) - .collect(Collectors.toList()) - ); - } - - private void cancelPendingAllocateFutures(Client client) { - if (!allocateFutures.containsKey(client)) { - return; - } - - allocateFutures.get(client).stream() - .filter(future -> !future.isDone()) - .forEach(future -> future.cancel(false)); - } - /** * A dummy client for cases in which we need to provide a client but do not have a real one. */ diff --git a/opentcs-strategies-default/src/main/java/org/opentcs/strategies/basic/scheduling/PendingAllocationManager.java b/opentcs-strategies-default/src/main/java/org/opentcs/strategies/basic/scheduling/PendingAllocationManager.java new file mode 100644 index 000000000..a94c059f4 --- /dev/null +++ b/opentcs-strategies-default/src/main/java/org/opentcs/strategies/basic/scheduling/PendingAllocationManager.java @@ -0,0 +1,204 @@ +// SPDX-FileCopyrightText: The openTCS Authors +// SPDX-License-Identifier: MIT + +package org.opentcs.strategies.basic.scheduling; + +import static java.util.Objects.requireNonNull; + +import jakarta.annotation.Nonnull; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import org.opentcs.components.Lifecycle; +import org.opentcs.components.kernel.Scheduler; + +/** + * Manages pending allocations and allocation futures scheduled on the kernel executor. + *

+ * This implementation assumes single-threaded access and is not thread-safe. + */ +public class PendingAllocationManager + implements + Lifecycle { + + /** + * Allocations deferred because they couldn't be granted, yet. + */ + private final Queue deferredAllocations = new ArrayDeque<>(); + /** + * Allocations that are scheduled for execution on the kernel executor. + */ + private final Map>> allocationFutures = new HashMap<>(); + /** + * This instance's initialized flag. + */ + private boolean initialized; + + /** + * Creates an instance. + */ + public PendingAllocationManager() { + + } + + @Override + public void initialize() { + if (isInitialized()) { + return; + } + + clear(); + this.initialized = true; + } + + @Override + public boolean isInitialized() { + return initialized; + } + + @Override + public void terminate() { + if (!isInitialized()) { + return; + } + + clear(); + this.initialized = false; + } + + /** + * Clears deferred allocations and all allocation futures. + */ + private void clear() { + deferredAllocations.clear(); + + cancelAllPendingAllocateFutures(); + allocationFutures.clear(); + } + + /** + * Clears pending allocations (deferred + futures) for the given client. + * + * @param client The scheduler's client. + */ + public void clearPendingAllocations( + @Nonnull + Scheduler.Client client + ) { + requireNonNull(client, "client"); + + clearDeferredAllocations(client); + cancelPendingAllocateFutures(client); + removeCompletedAllocateFutures(client); + } + + private void clearDeferredAllocations(Scheduler.Client client) { + deferredAllocations.removeIf(allocate -> client.equals(allocate.getClient())); + } + + /** + * Adds a deferred allocation to the queue. + * + * @param allocate The allocate command. + */ + public void addDeferredAllocation( + @Nonnull + AllocatorCommand.Allocate allocate + ) { + requireNonNull(allocate, "allocate"); + + deferredAllocations.add(allocate); + } + + /** + * Drains all deferred allocations. + *

+ * Retrieves all deferred allocations and clears the internal queue. + * + * @return All deferred allocations that were pending. + */ + public List drainDeferredAllocations() { + List allocations = new ArrayList<>(deferredAllocations); + deferredAllocations.clear(); + return allocations; + } + + /** + * Returns the number of pending allocation futures for each client. + * + * @return A map from each scheduler client to the number of its pending allocation futures. + */ + public Map countPendingAllocationFutures() { + return allocationFutures.entrySet() + .stream() + .map(entry -> Map.entry(entry.getKey(), countUndoneFutures(entry.getValue()))) + .filter(entry -> entry.getValue() > 0) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private int countUndoneFutures(List> futures) { + return (int) futures.stream() + .filter(f -> !f.isDone()) + .count(); + } + + /** + * Adds an allocation future for the given client. + *

+ * This method also removes any previously completed futures for the client. + * + * @param client The scheduler's client. + * @param allocationFuture The allocate future. + */ + public void addAllocationFuture( + @Nonnull + Scheduler.Client client, + @Nonnull + Future allocationFuture + ) { + requireNonNull(client, "client"); + requireNonNull(allocationFuture, "allocationFuture"); + + // Clean up the collection of allocate futures and remove futures that have already been + // completed. This could also be done in other places, but doing it for every new allocation + // should be sufficient. + removeCompletedAllocateFutures(client); + + allocationFutures + .computeIfAbsent(client, v -> new ArrayList<>()) + .add(allocationFuture); + } + + private void removeCompletedAllocateFutures(Scheduler.Client client) { + List> futures = allocationFutures.get(client); + if (futures != null) { + futures.removeIf(Future::isDone); + } + } + + private void cancelPendingAllocateFutures(Scheduler.Client client) { + if (!allocationFutures.containsKey(client)) { + return; + } + + allocationFutures.get(client).stream() + .filter(future -> !future.isDone()) + .forEach(future -> future.cancel(false)); + } + + private void cancelAllPendingAllocateFutures() { + allocationFutures.values() + .forEach(futures -> futures.forEach(future -> { + if (!future.isDone()) { + future.cancel(false); + } + }) + ); + } + +} diff --git a/opentcs-strategies-default/src/test/java/org/opentcs/strategies/basic/scheduling/PendingAllocationManagerTest.java b/opentcs-strategies-default/src/test/java/org/opentcs/strategies/basic/scheduling/PendingAllocationManagerTest.java new file mode 100644 index 000000000..5be4f2028 --- /dev/null +++ b/opentcs-strategies-default/src/test/java/org/opentcs/strategies/basic/scheduling/PendingAllocationManagerTest.java @@ -0,0 +1,154 @@ +// SPDX-FileCopyrightText: The openTCS Authors +// SPDX-License-Identifier: MIT + +package org.opentcs.strategies.basic.scheduling; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Future; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opentcs.components.kernel.Scheduler; + +/** + * Tests for {@link PendingAllocationManager}. + */ +class PendingAllocationManagerTest { + + private PendingAllocationManager pendingAllocationManager; + + private Scheduler.Client client; + + private AllocatorCommand.Allocate allocateCommand; + + @BeforeEach + void setUp() { + pendingAllocationManager = new PendingAllocationManager(); + + client = mock(Scheduler.Client.class); + + allocateCommand = new AllocatorCommand.Allocate(client, Set.of()); + } + + @Test + void shouldInitializeProperly() { + pendingAllocationManager.initialize(); + assertThat(pendingAllocationManager.isInitialized()).isTrue(); + assertThat(pendingAllocationManager.drainDeferredAllocations()) + .isEmpty(); + assertThat(pendingAllocationManager.countPendingAllocationFutures()) + .isEmpty(); + } + + @Test + void shouldTerminateProperly() { + pendingAllocationManager.initialize(); + pendingAllocationManager.terminate(); + + assertThat(pendingAllocationManager.isInitialized()) + .isFalse(); + assertThat(pendingAllocationManager.drainDeferredAllocations()) + .isEmpty(); + assertThat(pendingAllocationManager.countPendingAllocationFutures()) + .isEmpty(); + } + + + @Test + void shouldAddDeferredAllocation() { + pendingAllocationManager.addDeferredAllocation(allocateCommand); + + assertThat(pendingAllocationManager.drainDeferredAllocations()) + .hasSize(1) + .contains(allocateCommand); + } + + @Test + void shouldClearPendingAllocationsForExistingClient() { + pendingAllocationManager.addDeferredAllocation(allocateCommand); + + pendingAllocationManager.clearPendingAllocations(client); + + assertThat(pendingAllocationManager.drainDeferredAllocations()) + .isEmpty(); + } + + @Test + void shouldNotClearPendingAllocationsForUnknownClient() { + pendingAllocationManager.addDeferredAllocation(allocateCommand); + + Scheduler.Client unknownClient = mock(Scheduler.Client.class); + + pendingAllocationManager.clearPendingAllocations(unknownClient); + assertThat(pendingAllocationManager.drainDeferredAllocations()) + .hasSize(1) + .contains(allocateCommand); + } + + @Test + void shouldAddAllocationFuture() { + Future future = mock(Future.class); + + Map counts + = pendingAllocationManager.countPendingAllocationFutures(); + assertThat(counts.getOrDefault(client, 0)) + .isEqualTo(0); + + pendingAllocationManager.addAllocationFuture(client, future); + + assertThat(pendingAllocationManager.countPendingAllocationFutures().get(client)) + .isEqualTo(1); + } + + @Test + void shouldRemoveCompletedAllocationsWhenAddingNewFuture() { + Future future1 = mock(Future.class); + pendingAllocationManager.addAllocationFuture(client, future1); + + when(future1.isDone()).thenReturn(true); + + Future future2 = mock(Future.class); + pendingAllocationManager.addAllocationFuture(client, future2); + Map counts + = pendingAllocationManager.countPendingAllocationFutures(); + assertThat(counts.getOrDefault(client, 0)) + .isEqualTo(1); + } + + @Test + void shouldClearDeferredAndCancelPendingFuturesForClient() { + pendingAllocationManager.addDeferredAllocation(allocateCommand); + + Future future1 = mock(Future.class); + Future future2 = mock(Future.class); + + when(future1.isDone()).thenReturn(false); + when(future2.isDone()).thenReturn(true); + + pendingAllocationManager.addAllocationFuture(client, future1); + pendingAllocationManager.addAllocationFuture(client, future2); + + pendingAllocationManager.clearPendingAllocations(client); + Map counts + = pendingAllocationManager.countPendingAllocationFutures(); + + assertThat(pendingAllocationManager.drainDeferredAllocations()) + .isEmpty(); + assertThat(counts.getOrDefault(client, 0)) + .isEqualTo(1); + + verify(future1, times(1)) + .cancel(false); + verify(future2, times(0)) + .cancel(false); + + } + + +}