Skip to content

Commit 06bfff6

Browse files
authored
CAMEL-22524: Add HazelcastRoutePolicy integration test
Automate the manual HazelcastRoutePolicyMain test as a proper JUnit 5 IT: - 3 concurrent nodes with deterministic staggered startup, each with its own embedded HazelcastInstance and HazelcastRoutePolicy - Verifies distributed lock-based leader election: all nodes eventually acquire the lock and execute their routes - Thread-safe design: CopyOnWriteArrayList, local state, no static mutables - Try-with-resources for CamelContext; manual finally for HazelcastInstance - @timeout(2 min) to prevent CI hangs - AssertJ assertions with descriptive messages - Added assertj-core test-scoped dependency to camel-hazelcast
1 parent 5b62707 commit 06bfff6

2 files changed

Lines changed: 58 additions & 45 deletions

File tree

components/camel-hazelcast/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@
7979
<version>${project.version}</version>
8080
<scope>test</scope>
8181
</dependency>
82+
<dependency>
83+
<groupId>org.assertj</groupId>
84+
<artifactId>assertj-core</artifactId>
85+
<scope>test</scope>
86+
</dependency>
8287
</dependencies>
8388

8489
<profiles>

components/camel-hazelcast/src/test/java/org/apache/camel/component/hazelcast/policy/HazelcastRoutePolicyIT.java

Lines changed: 53 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,13 @@
1616
*/
1717
package org.apache.camel.component.hazelcast.policy;
1818

19-
import java.util.ArrayList;
2019
import java.util.List;
20+
import java.util.concurrent.CopyOnWriteArrayList;
2121
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.ExecutorService;
2223
import java.util.concurrent.Executors;
23-
import java.util.concurrent.ScheduledExecutorService;
2424
import java.util.concurrent.ThreadLocalRandom;
2525
import java.util.concurrent.TimeUnit;
26-
import java.util.stream.IntStream;
2726

2827
import com.hazelcast.config.Config;
2928
import com.hazelcast.core.Hazelcast;
@@ -32,85 +31,94 @@
3231
import org.apache.camel.impl.DefaultCamelContext;
3332
import org.apache.camel.test.infra.hazelcast.services.HazelcastService;
3433
import org.apache.camel.test.infra.hazelcast.services.HazelcastServiceFactory;
35-
import org.junit.jupiter.api.Assertions;
3634
import org.junit.jupiter.api.Test;
35+
import org.junit.jupiter.api.Timeout;
3736
import org.junit.jupiter.api.extension.RegisterExtension;
3837
import org.slf4j.Logger;
3938
import org.slf4j.LoggerFactory;
4039

40+
import static org.assertj.core.api.Assertions.assertThat;
41+
4142
/**
4243
* Integration test for {@link HazelcastRoutePolicy} that verifies leader election and route management using Hazelcast
4344
* distributed locks.
4445
*/
4546
public class HazelcastRoutePolicyIT {
4647

4748
private static final Logger LOGGER = LoggerFactory.getLogger(HazelcastRoutePolicyIT.class);
49+
private static final List<String> CLIENTS = List.of("0", "1", "2");
4850

4951
@RegisterExtension
5052
public static HazelcastService hazelcastService = HazelcastServiceFactory.createService();
5153

52-
private static final List<String> CLIENTS = IntStream.range(0, 3).mapToObj(Integer::toString).toList();
53-
private static final List<String> RESULTS = new ArrayList<>();
54-
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(CLIENTS.size() * 2);
55-
private static final CountDownLatch LATCH = new CountDownLatch(CLIENTS.size());
56-
5754
@Test
58-
public void test() throws Exception {
55+
@Timeout(value = 2, unit = TimeUnit.MINUTES)
56+
public void testLeaderElectionWithMultipleNodes() throws Exception {
57+
List<String> results = new CopyOnWriteArrayList<>();
58+
CountDownLatch latch = new CountDownLatch(CLIENTS.size());
59+
ExecutorService executor = Executors.newFixedThreadPool(CLIENTS.size());
60+
5961
for (String id : CLIENTS) {
60-
SCHEDULER.submit(() -> run(id));
62+
executor.submit(() -> run(id, results, latch));
6163
}
6264

63-
LATCH.await(1, TimeUnit.MINUTES);
64-
SCHEDULER.shutdownNow();
65+
assertThat(latch.await(1, TimeUnit.MINUTES)).as("All nodes should complete within timeout").isTrue();
66+
executor.shutdown();
67+
assertThat(executor.awaitTermination(30, TimeUnit.SECONDS)).as("Executor should terminate cleanly").isTrue();
6568

66-
Assertions.assertEquals(CLIENTS.size(), RESULTS.size());
67-
Assertions.assertTrue(RESULTS.containsAll(CLIENTS));
69+
assertThat(results).containsExactlyInAnyOrderElementsOf(CLIENTS);
6870
}
6971

70-
private static void run(String id) {
72+
private static void run(String id, List<String> results, CountDownLatch latch) {
73+
HazelcastInstance instance = null;
7174
try {
7275
int events = ThreadLocalRandom.current().nextInt(2, 6);
7376
CountDownLatch contextLatch = new CountDownLatch(events);
7477

7578
Config config = hazelcastService.createConfiguration(null, 0, "node-" + id, "set");
76-
HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);
79+
instance = Hazelcast.newHazelcastInstance(config);
7780

7881
HazelcastRoutePolicy policy = new HazelcastRoutePolicy(instance);
7982
policy.setLockMapName("camel-route-policy");
8083
policy.setLockKey("my-lock");
8184
policy.setLockValue("node-" + id);
8285
policy.setTryLockTimeout(5, TimeUnit.SECONDS);
8386

84-
DefaultCamelContext context = new DefaultCamelContext();
85-
context.disableJMX();
86-
context.getCamelContextExtension().setName("context-" + id);
87-
context.addRoutes(new RouteBuilder() {
88-
@Override
89-
public void configure() {
90-
from("timer:hazelcast?delay=1000&period=1000")
91-
.routeId("route-" + id)
92-
.routePolicy(policy)
93-
.log("From ${routeId}")
94-
.process(e -> contextLatch.countDown());
87+
try (DefaultCamelContext context = new DefaultCamelContext()) {
88+
context.disableJMX();
89+
context.getCamelContextExtension().setName("context-" + id);
90+
context.addRoutes(new RouteBuilder() {
91+
@Override
92+
public void configure() {
93+
from("timer:hazelcast?delay=1000&period=1000")
94+
.routeId("route-" + id)
95+
.routePolicy(policy)
96+
.log("From ${routeId}")
97+
.process(e -> contextLatch.countDown());
98+
}
99+
});
100+
101+
// Deterministic staggered startup based on node index
102+
Thread.sleep(Integer.parseInt(id) * 200L);
103+
104+
LOGGER.info("Starting CamelContext on node: {}", id);
105+
context.start();
106+
LOGGER.info("Started CamelContext on node: {}", id);
107+
108+
if (contextLatch.await(30, TimeUnit.SECONDS)) {
109+
LOGGER.info("Node {} completed {} events successfully", id, events);
110+
results.add(id);
111+
} else {
112+
LOGGER.warn("Node {} timed out waiting for route events (expected {} events)", id, events);
95113
}
96-
});
97-
98-
// Staggered startup
99-
Thread.sleep(ThreadLocalRandom.current().nextInt(500));
100-
101-
LOGGER.info("Starting CamelContext on node: {}", id);
102-
context.start();
103-
LOGGER.info("Started CamelContext on node: {}", id);
104-
105-
contextLatch.await(30, TimeUnit.SECONDS);
106-
107-
LOGGER.info("Shutting down node {}", id);
108-
RESULTS.add(id);
109-
context.stop();
110-
instance.shutdown();
111-
LATCH.countDown();
114+
}
112115
} catch (Exception e) {
113-
LOGGER.warn("{}", e.getMessage(), e);
116+
LOGGER.warn("Node {} failed: {}", id, e.getMessage(), e);
117+
} finally {
118+
if (instance != null) {
119+
instance.shutdown();
120+
}
121+
latch.countDown();
114122
}
115123
}
116124
}

0 commit comments

Comments
 (0)