Skip to content

Commit 05ee1eb

Browse files
committed
Resolve Pr comments
1 parent 0fa1e15 commit 05ee1eb

4 files changed

Lines changed: 267 additions & 3 deletions

File tree

components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,9 @@ public void onComplete(Exchange exchange) {
167167
try {
168168
PahoMqtt5Consumer.this.client.messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos());
169169
} catch (MqttException e) {
170-
throw new RuntimeException(e);
170+
getExceptionHandler().handleException(
171+
"Error acknowledging MQTT message with ID: " + mqttMessage.getId(),
172+
exchange, e);
171173
}
172174
}
173175

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.component.paho.mqtt5;
18+
19+
import java.util.List;
20+
21+
import org.apache.camel.Exchange;
22+
import org.apache.camel.Processor;
23+
import org.apache.camel.spi.ExceptionHandler;
24+
import org.apache.camel.spi.Synchronization;
25+
import org.apache.camel.test.junit6.CamelTestSupport;
26+
import org.eclipse.paho.mqttv5.client.MqttClient;
27+
import org.eclipse.paho.mqttv5.common.MqttException;
28+
import org.eclipse.paho.mqttv5.common.MqttMessage;
29+
import org.junit.jupiter.api.Test;
30+
31+
import static org.junit.jupiter.api.Assertions.assertEquals;
32+
import static org.junit.jupiter.api.Assertions.assertFalse;
33+
import static org.junit.jupiter.api.Assertions.assertNotNull;
34+
import static org.junit.jupiter.api.Assertions.assertTrue;
35+
import static org.mockito.ArgumentMatchers.any;
36+
import static org.mockito.ArgumentMatchers.contains;
37+
import static org.mockito.ArgumentMatchers.eq;
38+
import static org.mockito.Mockito.doThrow;
39+
import static org.mockito.Mockito.mock;
40+
import static org.mockito.Mockito.verify;
41+
import static org.mockito.Mockito.verifyNoInteractions;
42+
43+
public class PahoMqtt5ConsumerManualAckExceptionTest extends CamelTestSupport {
44+
45+
@Test
46+
public void testOnCompleteCallsExceptionHandlerOnMqttException() throws Exception {
47+
PahoMqtt5Endpoint endpoint = context.getEndpoint(
48+
"paho-mqtt5:test?manualAcksEnabled=true&brokerUrl=tcp://localhost:1883", PahoMqtt5Endpoint.class);
49+
50+
Processor mockProcessor = mock(Processor.class);
51+
PahoMqtt5Consumer consumer = new PahoMqtt5Consumer(endpoint, mockProcessor);
52+
53+
MqttClient mockClient = mock(MqttClient.class);
54+
doThrow(new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION))
55+
.when(mockClient).messageArrivedComplete(any(int.class), any(int.class));
56+
consumer.setClient(mockClient);
57+
58+
ExceptionHandler mockExceptionHandler = mock(ExceptionHandler.class);
59+
consumer.setExceptionHandler(mockExceptionHandler);
60+
61+
MqttMessage mqttMessage = new MqttMessage("test".getBytes());
62+
mqttMessage.setQos(2);
63+
64+
Exchange exchange = consumer.createExchange(mqttMessage, "test-topic");
65+
List<Synchronization> completions = exchange.getExchangeExtension().handoverCompletions();
66+
67+
assertNotNull(completions);
68+
assertFalse(completions.isEmpty());
69+
70+
// Trigger the onComplete callback (simulating successful exchange processing)
71+
completions.get(0).onComplete(exchange);
72+
73+
// Verify exception handler was called instead of throwing RuntimeException
74+
verify(mockExceptionHandler).handleException(
75+
contains("Error acknowledging MQTT message"),
76+
eq(exchange),
77+
any(MqttException.class));
78+
}
79+
80+
@Test
81+
public void testOnCompleteAcknowledgesSuccessfully() throws Exception {
82+
PahoMqtt5Endpoint endpoint = context.getEndpoint(
83+
"paho-mqtt5:test?manualAcksEnabled=true&brokerUrl=tcp://localhost:1883", PahoMqtt5Endpoint.class);
84+
85+
Processor mockProcessor = mock(Processor.class);
86+
PahoMqtt5Consumer consumer = new PahoMqtt5Consumer(endpoint, mockProcessor);
87+
88+
MqttClient mockClient = mock(MqttClient.class);
89+
consumer.setClient(mockClient);
90+
91+
ExceptionHandler mockExceptionHandler = mock(ExceptionHandler.class);
92+
consumer.setExceptionHandler(mockExceptionHandler);
93+
94+
MqttMessage mqttMessage = new MqttMessage("test".getBytes());
95+
mqttMessage.setQos(2);
96+
97+
Exchange exchange = consumer.createExchange(mqttMessage, "test-topic");
98+
List<Synchronization> completions = exchange.getExchangeExtension().handoverCompletions();
99+
100+
assertNotNull(completions);
101+
assertEquals(1, completions.size());
102+
103+
// Trigger the onComplete callback
104+
completions.get(0).onComplete(exchange);
105+
106+
// Verify messageArrivedComplete was called and no exception was handled
107+
verify(mockClient).messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos());
108+
verifyNoInteractions(mockExceptionHandler);
109+
}
110+
111+
@Test
112+
public void testNoCompletionRegisteredWhenManualAcksDisabled() throws Exception {
113+
PahoMqtt5Endpoint endpoint = context.getEndpoint(
114+
"paho-mqtt5:test?brokerUrl=tcp://localhost:1883", PahoMqtt5Endpoint.class);
115+
116+
Processor mockProcessor = mock(Processor.class);
117+
PahoMqtt5Consumer consumer = new PahoMqtt5Consumer(endpoint, mockProcessor);
118+
119+
MqttClient mockClient = mock(MqttClient.class);
120+
consumer.setClient(mockClient);
121+
122+
MqttMessage mqttMessage = new MqttMessage("test".getBytes());
123+
124+
Exchange exchange = consumer.createExchange(mqttMessage, "test-topic");
125+
List<Synchronization> completions = exchange.getExchangeExtension().handoverCompletions();
126+
127+
// No synchronization should be registered when manualAcks is disabled
128+
assertTrue(completions == null || completions.isEmpty());
129+
}
130+
}

components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,13 +154,15 @@ public void onComplete(Exchange exchange) {
154154
try {
155155
PahoConsumer.this.client.messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos());
156156
} catch (MqttException e) {
157-
throw new RuntimeException(e);
157+
getExceptionHandler().handleException(
158+
"Error acknowledging MQTT message with ID: " + mqttMessage.getId(),
159+
exchange, e);
158160
}
159161
}
160162

161163
@Override
162164
public void onFailure(Exchange exchange) {
163-
LOG.error("Rollback due to error processing Exchange ID: {}", exchange.getExchangeId(),
165+
LOG.debug("Rollback due to error processing Exchange ID: {}", exchange.getExchangeId(),
164166
exchange.getException());
165167
}
166168
});
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.component.paho;
18+
19+
import java.util.List;
20+
21+
import org.apache.camel.Exchange;
22+
import org.apache.camel.Processor;
23+
import org.apache.camel.spi.ExceptionHandler;
24+
import org.apache.camel.spi.Synchronization;
25+
import org.apache.camel.test.junit6.CamelTestSupport;
26+
import org.eclipse.paho.client.mqttv3.MqttClient;
27+
import org.eclipse.paho.client.mqttv3.MqttException;
28+
import org.eclipse.paho.client.mqttv3.MqttMessage;
29+
import org.junit.jupiter.api.Test;
30+
31+
import static org.junit.jupiter.api.Assertions.assertEquals;
32+
import static org.junit.jupiter.api.Assertions.assertFalse;
33+
import static org.junit.jupiter.api.Assertions.assertNotNull;
34+
import static org.junit.jupiter.api.Assertions.assertTrue;
35+
import static org.mockito.ArgumentMatchers.any;
36+
import static org.mockito.ArgumentMatchers.contains;
37+
import static org.mockito.ArgumentMatchers.eq;
38+
import static org.mockito.Mockito.doThrow;
39+
import static org.mockito.Mockito.mock;
40+
import static org.mockito.Mockito.verify;
41+
import static org.mockito.Mockito.verifyNoInteractions;
42+
43+
public class PahoConsumerManualAckExceptionTest extends CamelTestSupport {
44+
45+
@Test
46+
public void testOnCompleteCallsExceptionHandlerOnMqttException() throws Exception {
47+
PahoEndpoint endpoint = context.getEndpoint(
48+
"paho:test?manualAcksEnabled=true&brokerUrl=tcp://localhost:1883", PahoEndpoint.class);
49+
50+
Processor mockProcessor = mock(Processor.class);
51+
PahoConsumer consumer = new PahoConsumer(endpoint, mockProcessor);
52+
53+
MqttClient mockClient = mock(MqttClient.class);
54+
doThrow(new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION))
55+
.when(mockClient).messageArrivedComplete(any(int.class), any(int.class));
56+
consumer.setClient(mockClient);
57+
58+
ExceptionHandler mockExceptionHandler = mock(ExceptionHandler.class);
59+
consumer.setExceptionHandler(mockExceptionHandler);
60+
61+
MqttMessage mqttMessage = new MqttMessage("test".getBytes());
62+
mqttMessage.setQos(2);
63+
64+
Exchange exchange = consumer.createExchange(mqttMessage, "test-topic");
65+
List<Synchronization> completions = exchange.getExchangeExtension().handoverCompletions();
66+
67+
assertNotNull(completions);
68+
assertFalse(completions.isEmpty());
69+
70+
// Trigger the onComplete callback (simulating successful exchange processing)
71+
completions.get(0).onComplete(exchange);
72+
73+
// Verify exception handler was called instead of throwing RuntimeException
74+
verify(mockExceptionHandler).handleException(
75+
contains("Error acknowledging MQTT message"),
76+
eq(exchange),
77+
any(MqttException.class));
78+
}
79+
80+
@Test
81+
public void testOnCompleteAcknowledgesSuccessfully() throws Exception {
82+
PahoEndpoint endpoint = context.getEndpoint(
83+
"paho:test?manualAcksEnabled=true&brokerUrl=tcp://localhost:1883", PahoEndpoint.class);
84+
85+
Processor mockProcessor = mock(Processor.class);
86+
PahoConsumer consumer = new PahoConsumer(endpoint, mockProcessor);
87+
88+
MqttClient mockClient = mock(MqttClient.class);
89+
consumer.setClient(mockClient);
90+
91+
ExceptionHandler mockExceptionHandler = mock(ExceptionHandler.class);
92+
consumer.setExceptionHandler(mockExceptionHandler);
93+
94+
MqttMessage mqttMessage = new MqttMessage("test".getBytes());
95+
mqttMessage.setQos(2);
96+
97+
Exchange exchange = consumer.createExchange(mqttMessage, "test-topic");
98+
List<Synchronization> completions = exchange.getExchangeExtension().handoverCompletions();
99+
100+
assertNotNull(completions);
101+
assertEquals(1, completions.size());
102+
103+
// Trigger the onComplete callback
104+
completions.get(0).onComplete(exchange);
105+
106+
// Verify messageArrivedComplete was called and no exception was handled
107+
verify(mockClient).messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos());
108+
verifyNoInteractions(mockExceptionHandler);
109+
}
110+
111+
@Test
112+
public void testNoCompletionRegisteredWhenManualAcksDisabled() throws Exception {
113+
PahoEndpoint endpoint = context.getEndpoint(
114+
"paho:test?brokerUrl=tcp://localhost:1883", PahoEndpoint.class);
115+
116+
Processor mockProcessor = mock(Processor.class);
117+
PahoConsumer consumer = new PahoConsumer(endpoint, mockProcessor);
118+
119+
MqttClient mockClient = mock(MqttClient.class);
120+
consumer.setClient(mockClient);
121+
122+
MqttMessage mqttMessage = new MqttMessage("test".getBytes());
123+
124+
Exchange exchange = consumer.createExchange(mqttMessage, "test-topic");
125+
List<Synchronization> completions = exchange.getExchangeExtension().handoverCompletions();
126+
127+
// No synchronization should be registered when manualAcks is disabled
128+
assertTrue(completions == null || completions.isEmpty());
129+
}
130+
}

0 commit comments

Comments
 (0)