Skip to content

Commit 1165776

Browse files
nkokitkarCopilot
andauthored
MQTT: Throw Runtime Exception when manual ack fails (#22117)
* Throw Runtime Exception when manual ack fails * Resolve Pr comments * Fix: restore error log level in onFailure callback Revert the log level in onFailure from debug back to error, as this callback handles exchange processing failures which should not be silently swallowed at debug level. Addresses review comment from @Croway. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent e6d8eba commit 1165776

4 files changed

Lines changed: 266 additions & 2 deletions

File tree

components/camel-paho-mqtt5/src/main/java/org/apache/camel/component/paho/mqtt5/PahoMqtt5Consumer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,9 @@ public void onComplete(Exchange exchange) {
167167
try {
168168
PahoMqtt5Consumer.this.client.messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos());
169169
} catch (MqttException e) {
170-
LOG.warn("Failed to commit message with ID: {} due to {}", mqttMessage.getId(), e.getMessage(), e);
170+
getExceptionHandler().handleException(
171+
"Error acknowledging MQTT message with ID: " + mqttMessage.getId(),
172+
exchange, e);
171173
}
172174
}
173175

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.component.paho.mqtt5;
18+
19+
import java.util.List;
20+
21+
import org.apache.camel.Exchange;
22+
import org.apache.camel.Processor;
23+
import org.apache.camel.spi.ExceptionHandler;
24+
import org.apache.camel.spi.Synchronization;
25+
import org.apache.camel.test.junit6.CamelTestSupport;
26+
import org.eclipse.paho.mqttv5.client.MqttClient;
27+
import org.eclipse.paho.mqttv5.common.MqttException;
28+
import org.eclipse.paho.mqttv5.common.MqttMessage;
29+
import org.junit.jupiter.api.Test;
30+
31+
import static org.junit.jupiter.api.Assertions.assertEquals;
32+
import static org.junit.jupiter.api.Assertions.assertFalse;
33+
import static org.junit.jupiter.api.Assertions.assertNotNull;
34+
import static org.junit.jupiter.api.Assertions.assertTrue;
35+
import static org.mockito.ArgumentMatchers.any;
36+
import static org.mockito.ArgumentMatchers.contains;
37+
import static org.mockito.ArgumentMatchers.eq;
38+
import static org.mockito.Mockito.doThrow;
39+
import static org.mockito.Mockito.mock;
40+
import static org.mockito.Mockito.verify;
41+
import static org.mockito.Mockito.verifyNoInteractions;
42+
43+
public class PahoMqtt5ConsumerManualAckExceptionTest extends CamelTestSupport {
44+
45+
@Test
46+
public void testOnCompleteCallsExceptionHandlerOnMqttException() throws Exception {
47+
PahoMqtt5Endpoint endpoint = context.getEndpoint(
48+
"paho-mqtt5:test?manualAcksEnabled=true&brokerUrl=tcp://localhost:1883", PahoMqtt5Endpoint.class);
49+
50+
Processor mockProcessor = mock(Processor.class);
51+
PahoMqtt5Consumer consumer = new PahoMqtt5Consumer(endpoint, mockProcessor);
52+
53+
MqttClient mockClient = mock(MqttClient.class);
54+
doThrow(new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION))
55+
.when(mockClient).messageArrivedComplete(any(int.class), any(int.class));
56+
consumer.setClient(mockClient);
57+
58+
ExceptionHandler mockExceptionHandler = mock(ExceptionHandler.class);
59+
consumer.setExceptionHandler(mockExceptionHandler);
60+
61+
MqttMessage mqttMessage = new MqttMessage("test".getBytes());
62+
mqttMessage.setQos(2);
63+
64+
Exchange exchange = consumer.createExchange(mqttMessage, "test-topic");
65+
List<Synchronization> completions = exchange.getExchangeExtension().handoverCompletions();
66+
67+
assertNotNull(completions);
68+
assertFalse(completions.isEmpty());
69+
70+
// Trigger the onComplete callback (simulating successful exchange processing)
71+
completions.get(0).onComplete(exchange);
72+
73+
// Verify exception handler was called instead of throwing RuntimeException
74+
verify(mockExceptionHandler).handleException(
75+
contains("Error acknowledging MQTT message"),
76+
eq(exchange),
77+
any(MqttException.class));
78+
}
79+
80+
@Test
81+
public void testOnCompleteAcknowledgesSuccessfully() throws Exception {
82+
PahoMqtt5Endpoint endpoint = context.getEndpoint(
83+
"paho-mqtt5:test?manualAcksEnabled=true&brokerUrl=tcp://localhost:1883", PahoMqtt5Endpoint.class);
84+
85+
Processor mockProcessor = mock(Processor.class);
86+
PahoMqtt5Consumer consumer = new PahoMqtt5Consumer(endpoint, mockProcessor);
87+
88+
MqttClient mockClient = mock(MqttClient.class);
89+
consumer.setClient(mockClient);
90+
91+
ExceptionHandler mockExceptionHandler = mock(ExceptionHandler.class);
92+
consumer.setExceptionHandler(mockExceptionHandler);
93+
94+
MqttMessage mqttMessage = new MqttMessage("test".getBytes());
95+
mqttMessage.setQos(2);
96+
97+
Exchange exchange = consumer.createExchange(mqttMessage, "test-topic");
98+
List<Synchronization> completions = exchange.getExchangeExtension().handoverCompletions();
99+
100+
assertNotNull(completions);
101+
assertEquals(1, completions.size());
102+
103+
// Trigger the onComplete callback
104+
completions.get(0).onComplete(exchange);
105+
106+
// Verify messageArrivedComplete was called and no exception was handled
107+
verify(mockClient).messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos());
108+
verifyNoInteractions(mockExceptionHandler);
109+
}
110+
111+
@Test
112+
public void testNoCompletionRegisteredWhenManualAcksDisabled() throws Exception {
113+
PahoMqtt5Endpoint endpoint = context.getEndpoint(
114+
"paho-mqtt5:test?brokerUrl=tcp://localhost:1883", PahoMqtt5Endpoint.class);
115+
116+
Processor mockProcessor = mock(Processor.class);
117+
PahoMqtt5Consumer consumer = new PahoMqtt5Consumer(endpoint, mockProcessor);
118+
119+
MqttClient mockClient = mock(MqttClient.class);
120+
consumer.setClient(mockClient);
121+
122+
MqttMessage mqttMessage = new MqttMessage("test".getBytes());
123+
124+
Exchange exchange = consumer.createExchange(mqttMessage, "test-topic");
125+
List<Synchronization> completions = exchange.getExchangeExtension().handoverCompletions();
126+
127+
// No synchronization should be registered when manualAcks is disabled
128+
assertTrue(completions == null || completions.isEmpty());
129+
}
130+
}

components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConsumer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,9 @@ public void onComplete(Exchange exchange) {
154154
try {
155155
PahoConsumer.this.client.messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos());
156156
} catch (MqttException e) {
157-
LOG.warn("Failed to commit message with ID {} due to MqttException.", mqttMessage.getId());
157+
getExceptionHandler().handleException(
158+
"Error acknowledging MQTT message with ID: " + mqttMessage.getId(),
159+
exchange, e);
158160
}
159161
}
160162

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.component.paho;
18+
19+
import java.util.List;
20+
21+
import org.apache.camel.Exchange;
22+
import org.apache.camel.Processor;
23+
import org.apache.camel.spi.ExceptionHandler;
24+
import org.apache.camel.spi.Synchronization;
25+
import org.apache.camel.test.junit6.CamelTestSupport;
26+
import org.eclipse.paho.client.mqttv3.MqttClient;
27+
import org.eclipse.paho.client.mqttv3.MqttException;
28+
import org.eclipse.paho.client.mqttv3.MqttMessage;
29+
import org.junit.jupiter.api.Test;
30+
31+
import static org.junit.jupiter.api.Assertions.assertEquals;
32+
import static org.junit.jupiter.api.Assertions.assertFalse;
33+
import static org.junit.jupiter.api.Assertions.assertNotNull;
34+
import static org.junit.jupiter.api.Assertions.assertTrue;
35+
import static org.mockito.ArgumentMatchers.any;
36+
import static org.mockito.ArgumentMatchers.contains;
37+
import static org.mockito.ArgumentMatchers.eq;
38+
import static org.mockito.Mockito.doThrow;
39+
import static org.mockito.Mockito.mock;
40+
import static org.mockito.Mockito.verify;
41+
import static org.mockito.Mockito.verifyNoInteractions;
42+
43+
public class PahoConsumerManualAckExceptionTest extends CamelTestSupport {
44+
45+
@Test
46+
public void testOnCompleteCallsExceptionHandlerOnMqttException() throws Exception {
47+
PahoEndpoint endpoint = context.getEndpoint(
48+
"paho:test?manualAcksEnabled=true&brokerUrl=tcp://localhost:1883", PahoEndpoint.class);
49+
50+
Processor mockProcessor = mock(Processor.class);
51+
PahoConsumer consumer = new PahoConsumer(endpoint, mockProcessor);
52+
53+
MqttClient mockClient = mock(MqttClient.class);
54+
doThrow(new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION))
55+
.when(mockClient).messageArrivedComplete(any(int.class), any(int.class));
56+
consumer.setClient(mockClient);
57+
58+
ExceptionHandler mockExceptionHandler = mock(ExceptionHandler.class);
59+
consumer.setExceptionHandler(mockExceptionHandler);
60+
61+
MqttMessage mqttMessage = new MqttMessage("test".getBytes());
62+
mqttMessage.setQos(2);
63+
64+
Exchange exchange = consumer.createExchange(mqttMessage, "test-topic");
65+
List<Synchronization> completions = exchange.getExchangeExtension().handoverCompletions();
66+
67+
assertNotNull(completions);
68+
assertFalse(completions.isEmpty());
69+
70+
// Trigger the onComplete callback (simulating successful exchange processing)
71+
completions.get(0).onComplete(exchange);
72+
73+
// Verify exception handler was called instead of throwing RuntimeException
74+
verify(mockExceptionHandler).handleException(
75+
contains("Error acknowledging MQTT message"),
76+
eq(exchange),
77+
any(MqttException.class));
78+
}
79+
80+
@Test
81+
public void testOnCompleteAcknowledgesSuccessfully() throws Exception {
82+
PahoEndpoint endpoint = context.getEndpoint(
83+
"paho:test?manualAcksEnabled=true&brokerUrl=tcp://localhost:1883", PahoEndpoint.class);
84+
85+
Processor mockProcessor = mock(Processor.class);
86+
PahoConsumer consumer = new PahoConsumer(endpoint, mockProcessor);
87+
88+
MqttClient mockClient = mock(MqttClient.class);
89+
consumer.setClient(mockClient);
90+
91+
ExceptionHandler mockExceptionHandler = mock(ExceptionHandler.class);
92+
consumer.setExceptionHandler(mockExceptionHandler);
93+
94+
MqttMessage mqttMessage = new MqttMessage("test".getBytes());
95+
mqttMessage.setQos(2);
96+
97+
Exchange exchange = consumer.createExchange(mqttMessage, "test-topic");
98+
List<Synchronization> completions = exchange.getExchangeExtension().handoverCompletions();
99+
100+
assertNotNull(completions);
101+
assertEquals(1, completions.size());
102+
103+
// Trigger the onComplete callback
104+
completions.get(0).onComplete(exchange);
105+
106+
// Verify messageArrivedComplete was called and no exception was handled
107+
verify(mockClient).messageArrivedComplete(mqttMessage.getId(), mqttMessage.getQos());
108+
verifyNoInteractions(mockExceptionHandler);
109+
}
110+
111+
@Test
112+
public void testNoCompletionRegisteredWhenManualAcksDisabled() throws Exception {
113+
PahoEndpoint endpoint = context.getEndpoint(
114+
"paho:test?brokerUrl=tcp://localhost:1883", PahoEndpoint.class);
115+
116+
Processor mockProcessor = mock(Processor.class);
117+
PahoConsumer consumer = new PahoConsumer(endpoint, mockProcessor);
118+
119+
MqttClient mockClient = mock(MqttClient.class);
120+
consumer.setClient(mockClient);
121+
122+
MqttMessage mqttMessage = new MqttMessage("test".getBytes());
123+
124+
Exchange exchange = consumer.createExchange(mqttMessage, "test-topic");
125+
List<Synchronization> completions = exchange.getExchangeExtension().handoverCompletions();
126+
127+
// No synchronization should be registered when manualAcks is disabled
128+
assertTrue(completions == null || completions.isEmpty());
129+
}
130+
}

0 commit comments

Comments
 (0)