Skip to content

Commit 3da69c4

Browse files
committed
Make late acknowledge
1 parent 73ab6f6 commit 3da69c4

2 files changed

Lines changed: 14 additions & 39 deletions

File tree

oslo_messaging/_drivers/amqpdriver.py

Lines changed: 12 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -207,20 +207,11 @@ def heartbeat(self):
207207
raise MessageDeliveryFailure(
208208
"Heartbeat send failed. Missing exchange")
209209

210-
# NOTE(sileht): Those have already be ack in RpcListener IO thread
211-
# We keep them as noop until all drivers do the same
212-
def acknowledge(self):
213-
pass
214-
215-
def requeue(self):
216-
pass
217-
218-
219-
class NotificationAMQPIncomingMessage(AMQPIncomingMessage):
220210
def acknowledge(self):
221211
def _do_ack():
222212
try:
223213
self.message.acknowledge()
214+
LOG.info("Received message msg_id: %s has been acknowledged", self.msg_id)
224215
except Exception as exc:
225216
# NOTE(kgiusti): this failure is likely due to a loss of the
226217
# connection to the broker. Not much we can do in this case,
@@ -229,6 +220,7 @@ def _do_ack():
229220
# (unacked msg is returned to the queue by the broker), but the
230221
# driver tries to catch that using the msg_id_cache.
231222
LOG.warning("Failed to acknowledge received message: %s", exc)
223+
232224
self._message_operations_handler.do(_do_ack)
233225
self.listener.msg_id_cache.add(self.unique_id)
234226

@@ -242,11 +234,17 @@ def requeue(self):
242234
def _do_requeue():
243235
try:
244236
self.message.requeue()
237+
LOG.info("Received message msg_id: %s has been requeued", self.msg_id)
245238
except Exception as exc:
246239
LOG.warning("Failed to requeue received message: %s", exc)
240+
247241
self._message_operations_handler.do(_do_requeue)
248242

249243

244+
class NotificationAMQPIncomingMessage(AMQPIncomingMessage):
245+
pass
246+
247+
250248
class ObsoleteReplyQueuesCache(object):
251249
"""Cache of reply queue id that doesn't exist anymore.
252250
@@ -399,31 +397,10 @@ class RpcAMQPListener(AMQPListener):
399397
use_cache = True
400398

401399
def __call__(self, message):
402-
# NOTE(kgiusti): In the original RPC implementation the RPC server
403-
# would acknowledge the request THEN process it. The goal of this was
404-
# to prevent duplication if the ack failed. Should the ack fail the
405-
# request would be discarded since the broker would not remove the
406-
# request from the queue since no ack was received. That would lead to
407-
# the request being redelivered at some point. However this approach
408-
# meant that the ack was issued from the dispatch thread, not the
409-
# consumer thread, which is bad since kombu is not thread safe. So a
410-
# change was made to schedule the ack to be sent on the consumer thread
411-
# - breaking the ability to catch ack errors before dispatching the
412-
# request. To fix this we do the actual ack here in the consumer
413-
# callback and avoid the upcall if the ack fails. See
414-
# https://bugs.launchpad.net/oslo.messaging/+bug/1695746
415-
# for all the gory details...
416-
try:
417-
message.acknowledge()
418-
except Exception as exc:
419-
LOG.warning("Discarding RPC request due to failed acknowledge: %s",
420-
exc)
421-
else:
422-
# NOTE(kgiusti): be aware that even if the acknowledge call
423-
# succeeds there is no guarantee the broker actually gets the ACK
424-
# since acknowledge() simply writes the ACK to the socket (there is
425-
# no ACK confirmation coming back from the broker)
426-
super(RpcAMQPListener, self).__call__(message)
400+
# NOTE(andrvb): The acknowledgment of the request is deferred until
401+
# after the request has been processed. This provides an at-least-once
402+
# delivery semantic. See RPCServer._process_incoming.
403+
super(RpcAMQPListener, self).__call__(message)
427404

428405

429406
class NotificationAMQPListener(AMQPListener):

oslo_messaging/rpc/server.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,10 +152,6 @@ def _create_listener(self):
152152
def _process_incoming(self, incoming):
153153
message = incoming[0]
154154

155-
# TODO(sileht): We should remove that at some point and do
156-
# this directly in the driver
157-
message.acknowledge()
158-
159155
failure = None
160156
try:
161157
res = self.dispatcher.dispatch(message)
@@ -194,6 +190,8 @@ def _process_incoming(self, incoming):
194190
# exc_info.
195191
del failure
196192

193+
message.acknowledge()
194+
197195

198196
def get_rpc_server(transport, target, endpoints,
199197
executor=None, serializer=None, access_policy=None):

0 commit comments

Comments
 (0)