Skip to content

Commit 29e73b5

Browse files
authored
CAMEL-20199: Add JMX support for virtual thread executors (#21701)
Virtual thread executors (ThreadPerTaskExecutor) were silently skipped from JMX registration because LifecycleStrategy.onThreadPoolAdd only accepted ThreadPoolExecutor. This adds overloaded methods for ExecutorService to register non-ThreadPoolExecutor instances in JMX. New ManagedVirtualThreadExecutorMBean exposes basic attributes: id, sourceId, routeId, isVirtualThread, and isShutdown.
1 parent 20daa27 commit 29e73b5

10 files changed

Lines changed: 370 additions & 1 deletion

File tree

core/camel-api/src/main/java/org/apache/camel/spi/LifecycleStrategy.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.camel.spi;
1818

1919
import java.util.Collection;
20+
import java.util.concurrent.ExecutorService;
2021
import java.util.concurrent.ThreadPoolExecutor;
2122

2223
import org.apache.camel.CamelContext;
@@ -209,4 +210,30 @@ void onThreadPoolAdd(
209210
*/
210211
void onThreadPoolRemove(CamelContext camelContext, ThreadPoolExecutor threadPool);
211212

213+
/**
214+
* Notification on adding an executor service (such as a virtual thread executor) that is not a
215+
* {@link ThreadPoolExecutor}.
216+
*
217+
* @param camelContext the camel context
218+
* @param executorService the executor service
219+
* @param id id of the thread pool (can be null in special cases)
220+
* @param sourceId id of the source creating the thread pool (can be null in special cases)
221+
* @param routeId id of the route for the source (is null if no source)
222+
* @param threadPoolProfileId id of the thread pool profile, if used for creating this thread pool (can be null)
223+
*/
224+
default void onThreadPoolAdd(
225+
CamelContext camelContext, ExecutorService executorService, String id,
226+
String sourceId, String routeId, String threadPoolProfileId) {
227+
}
228+
229+
/**
230+
* Notification on removing an executor service (such as a virtual thread executor) that is not a
231+
* {@link ThreadPoolExecutor}.
232+
*
233+
* @param camelContext the camel context
234+
* @param executorService the executor service
235+
*/
236+
default void onThreadPoolRemove(CamelContext camelContext, ExecutorService executorService) {
237+
}
238+
212239
}

core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectNameStrategy.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.camel.spi;
1818

19+
import java.util.concurrent.ExecutorService;
1920
import java.util.concurrent.ThreadPoolExecutor;
2021

2122
import javax.management.MalformedObjectNameException;
@@ -78,6 +79,12 @@ ObjectName getObjectNameForClusterService(CamelContext context, CamelClusterServ
7879
ObjectName getObjectNameForThreadPool(CamelContext context, ThreadPoolExecutor threadPool, String id, String sourceId)
7980
throws MalformedObjectNameException;
8081

82+
default ObjectName getObjectNameForThreadPool(
83+
CamelContext context, ExecutorService executorService, String id, String sourceId)
84+
throws MalformedObjectNameException {
85+
return null;
86+
}
87+
8188
ObjectName getObjectNameForEventNotifier(CamelContext context, EventNotifier eventNotifier)
8289
throws MalformedObjectNameException;
8390
}

core/camel-api/src/main/java/org/apache/camel/spi/ManagementObjectStrategy.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.camel.spi;
1818

19+
import java.util.concurrent.ExecutorService;
1920
import java.util.concurrent.ThreadPoolExecutor;
2021

2122
import org.apache.camel.CamelContext;
@@ -67,5 +68,11 @@ Object getManagedObjectForThreadPool(
6768
CamelContext context, ThreadPoolExecutor threadPool,
6869
String id, String sourceId, String routeId, String threadPoolProfileId);
6970

71+
default Object getManagedObjectForThreadPool(
72+
CamelContext context, ExecutorService executorService,
73+
String id, String sourceId, String routeId, String threadPoolProfileId) {
74+
return null;
75+
}
76+
7077
Object getManagedObjectForEventNotifier(CamelContext context, EventNotifier eventNotifier);
7178
}

core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/BaseExecutorServiceManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,11 @@ private void doRemove(ExecutorService executorService, boolean failSafe) {
372372
for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
373373
lifecycle.onThreadPoolRemove(camelContext, threadPool);
374374
}
375+
} else {
376+
// for non-ThreadPoolExecutor instances (e.g., virtual thread executors)
377+
for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
378+
lifecycle.onThreadPoolRemove(camelContext, executorService);
379+
}
375380
}
376381

377382
// remove reference as its shutdown (do not remove if fail-safe)
@@ -592,6 +597,11 @@ private void onThreadPoolCreated(ExecutorService executorService, Object source,
592597
for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
593598
lifecycle.onThreadPoolAdd(camelContext, threadPool, id, sourceId, routeId, threadPoolProfileId);
594599
}
600+
} else {
601+
// for non-ThreadPoolExecutor instances (e.g., virtual thread executors)
602+
for (LifecycleStrategy lifecycle : camelContext.getLifecycleStrategies()) {
603+
lifecycle.onThreadPoolAdd(camelContext, executorService, id, sourceId, routeId, threadPoolProfileId);
604+
}
595605
}
596606

597607
// now call strategy to allow custom logic
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.api.management.mbean;
18+
19+
import org.apache.camel.api.management.ManagedAttribute;
20+
21+
public interface ManagedVirtualThreadExecutorMBean {
22+
23+
@ManagedAttribute(description = "Camel ID")
24+
String getCamelId();
25+
26+
@ManagedAttribute(description = "Camel ManagementName")
27+
String getCamelManagementName();
28+
29+
@ManagedAttribute(description = "Thread Pool ID")
30+
String getId();
31+
32+
@ManagedAttribute(description = "ID of source for creating Thread Pool")
33+
String getSourceId();
34+
35+
@ManagedAttribute(description = "Route ID for the source, which created the Thread Pool")
36+
String getRouteId();
37+
38+
@ManagedAttribute(description = "ID of the thread pool profile which this pool is based upon")
39+
String getThreadPoolProfileId();
40+
41+
@ManagedAttribute(description = "Whether this executor uses virtual threads")
42+
boolean isVirtualThread();
43+
44+
@ManagedAttribute(description = "Is shutdown")
45+
boolean isShutdown();
46+
}

core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectNameStrategy.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.camel.management;
1818

1919
import java.net.UnknownHostException;
20+
import java.util.concurrent.ExecutorService;
2021
import java.util.concurrent.ThreadPoolExecutor;
2122

2223
import javax.management.MalformedObjectNameException;
@@ -54,6 +55,7 @@
5455
import org.apache.camel.management.mbean.ManagedSupervisingRouteController;
5556
import org.apache.camel.management.mbean.ManagedThreadPool;
5657
import org.apache.camel.management.mbean.ManagedTracer;
58+
import org.apache.camel.management.mbean.ManagedVirtualThreadExecutor;
5759
import org.apache.camel.spi.DataFormat;
5860
import org.apache.camel.spi.EventNotifier;
5961
import org.apache.camel.spi.ManagementObjectNameStrategy;
@@ -162,6 +164,9 @@ public ObjectName getObjectName(Object managedObject) throws MalformedObjectName
162164
objectName = getObjectNameForTracer(mt.getContext(), mt.getTracer());
163165
} else if (managedObject instanceof ManagedThreadPool mes) {
164166
objectName = getObjectNameForThreadPool(mes.getContext(), mes.getThreadPool(), mes.getId(), mes.getSourceId());
167+
} else if (managedObject instanceof ManagedVirtualThreadExecutor mvte) {
168+
objectName = getObjectNameForThreadPool(
169+
mvte.getContext(), mvte.getExecutorService(), mvte.getId(), mvte.getSourceId());
165170
} else if (managedObject instanceof ManagedClusterService mcs) {
166171
objectName = getObjectNameForClusterService(mcs.getContext(), mcs.getService());
167172
} else if (managedObject instanceof ManagedService ms) {
@@ -422,6 +427,18 @@ public ObjectName getObjectNameForClusterService(CamelContext context, CamelClus
422427
public ObjectName getObjectNameForThreadPool(
423428
CamelContext context, ThreadPoolExecutor threadPool, String id, String sourceId)
424429
throws MalformedObjectNameException {
430+
return getObjectNameForThreadPool(context, id, sourceId);
431+
}
432+
433+
@Override
434+
public ObjectName getObjectNameForThreadPool(
435+
CamelContext context, ExecutorService executorService, String id, String sourceId)
436+
throws MalformedObjectNameException {
437+
return getObjectNameForThreadPool(context, id, sourceId);
438+
}
439+
440+
private ObjectName getObjectNameForThreadPool(CamelContext context, String id, String sourceId)
441+
throws MalformedObjectNameException {
425442
StringBuilder buffer = new StringBuilder();
426443
buffer.append(domainName).append(":");
427444
buffer.append(KEY_CONTEXT).append("=").append(getContextId(context)).append(",");

core/camel-management/src/main/java/org/apache/camel/management/DefaultManagementObjectStrategy.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.camel.management;
1818

19+
import java.util.concurrent.ExecutorService;
1920
import java.util.concurrent.ThreadPoolExecutor;
2021

2122
import org.apache.camel.CamelContext;
@@ -111,6 +112,7 @@
111112
import org.apache.camel.management.mbean.ManagedTransformer;
112113
import org.apache.camel.management.mbean.ManagedUnmarshal;
113114
import org.apache.camel.management.mbean.ManagedValidate;
115+
import org.apache.camel.management.mbean.ManagedVirtualThreadExecutor;
114116
import org.apache.camel.management.mbean.ManagedWeightedLoadBalancer;
115117
import org.apache.camel.management.mbean.ManagedWireTapProcessor;
116118
import org.apache.camel.model.ExpressionNode;
@@ -284,6 +286,16 @@ public Object getManagedObjectForThreadPool(
284286
return mtp;
285287
}
286288

289+
@Override
290+
public Object getManagedObjectForThreadPool(
291+
CamelContext context, ExecutorService executorService,
292+
String id, String sourceId, String routeId, String threadPoolProfileId) {
293+
ManagedVirtualThreadExecutor mvte
294+
= new ManagedVirtualThreadExecutor(context, executorService, id, sourceId, routeId, threadPoolProfileId);
295+
mvte.init(context.getManagementStrategy());
296+
return mvte;
297+
}
298+
287299
@Override
288300
public Object getManagedObjectForEventNotifier(CamelContext context, EventNotifier eventNotifier) {
289301
ManagedEventNotifier men = new ManagedEventNotifier(context, eventNotifier);

core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.Set;
27+
import java.util.concurrent.ExecutorService;
2728
import java.util.concurrent.ThreadPoolExecutor;
2829

2930
import javax.management.JMException;
@@ -149,7 +150,7 @@ public class JmxManagementLifecycleStrategy extends ServiceSupport implements Li
149150
private final Set<String> knowRouteIds = new HashSet<>();
150151
private final Map<BacklogTracer, ManagedBacklogTracer> managedBacklogTracers = new HashMap<>();
151152
private final Map<DefaultBacklogDebugger, ManagedBacklogDebugger> managedBacklogDebuggers = new HashMap<>();
152-
private final Map<ThreadPoolExecutor, Object> managedThreadPools = new HashMap<>();
153+
private final Map<Object, Object> managedThreadPools = new HashMap<>();
153154

154155
public JmxManagementLifecycleStrategy() {
155156
}
@@ -815,6 +816,62 @@ public void onThreadPoolRemove(CamelContext camelContext, ThreadPoolExecutor thr
815816
}
816817
}
817818

819+
@Override
820+
public void onThreadPoolAdd(
821+
CamelContext camelContext, ExecutorService executorService, String id,
822+
String sourceId, String routeId, String threadPoolProfileId) {
823+
824+
if (!initialized) {
825+
preServices
826+
.add(lf -> lf.onThreadPoolAdd(camelContext, executorService, id, sourceId, routeId,
827+
threadPoolProfileId));
828+
return;
829+
}
830+
831+
if (!shouldRegister(executorService, null)) {
832+
return;
833+
}
834+
835+
Object mtp = getManagementObjectStrategy().getManagedObjectForThreadPool(camelContext, executorService, id, sourceId,
836+
routeId, threadPoolProfileId);
837+
if (mtp == null) {
838+
return;
839+
}
840+
841+
if (getManagementStrategy().isManaged(mtp)) {
842+
LOG.trace("The executor service is already managed: {}", executorService);
843+
return;
844+
}
845+
846+
try {
847+
manageObject(mtp);
848+
managedThreadPools.put(executorService, mtp);
849+
} catch (Exception e) {
850+
LOG.warn("Could not register executor service: {} as ThreadPool MBean.", executorService, e);
851+
}
852+
}
853+
854+
@Override
855+
public void onThreadPoolRemove(CamelContext camelContext, ExecutorService executorService) {
856+
if (!initialized) {
857+
return;
858+
}
859+
860+
Object mtp = managedThreadPools.remove(executorService);
861+
if (mtp != null) {
862+
if (!getManagementStrategy().isManaged(mtp)) {
863+
LOG.trace("The executor service is not managed: {}", executorService);
864+
return;
865+
}
866+
867+
try {
868+
unmanageObject(mtp);
869+
} catch (Exception e) {
870+
LOG.warn("Could not unregister ThreadPool MBean", e);
871+
}
872+
}
873+
}
874+
818875
@Override
819876
public void onRouteContextCreate(Route route) {
820877
// Create a map (ProcessorType -> PerformanceCounter)

0 commit comments

Comments
 (0)