Skip to content

Commit

Permalink
Don't double reject messages
Browse files Browse the repository at this point in the history
In a `message.process` block, the message will automatically be ack'd or rejected depending on whether an exception is thrown or not.

Since we sometimes want to reject with requeue, and sometimes without, we cannot use this mechanism, so rather handle it manually
  • Loading branch information
rudigiesler committed Jan 3, 2023
1 parent de9479e commit 18be59d
Showing 1 changed file with 17 additions and 17 deletions.
34 changes: 17 additions & 17 deletions vxwhatsapp/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,26 +94,26 @@ async def process_message(self, message: IncomingMessage):
await message.reject(requeue=False)
return

async with message.process():
logger.debug(f"Processing outbound message {msg}")
try:
await self.submit_message(msg)
except aiohttp.ClientResponseError as e:
# If it's a retryable upstream error, requeue the message
if e.status > 499:
await message.reject(requeue=True)
# If we've retried this before, wait before retrying again
if message.redelivered:
await sleep(0.5)
return

logger.debug(f"Processing outbound message {msg}")
try:
await self.submit_message(msg)
except aiohttp.ClientResponseError as e:
# If it's a retryable upstream error, requeue the message
if e.status > 499:
await message.reject(requeue=True)
# If we've retried this before, wait before retrying again
if message.redelivered:
await sleep(0.5)
else:
# Otherwise log the error and reject
logger.exception(f"Upstream HTTP error processing {msg}")
await message.reject(requeue=False)
except Exception:
# Any other errors aren't recoverable, so log and reject
logger.exception(f"Error processing {msg}")
await message.reject(requeue=False)
except Exception:
# Any other errors aren't recoverable, so log and reject
logger.exception(f"Error processing {msg}")
await message.reject(requeue=False)
else:
await message.ack()

async def get_media_id(self, media_url):
if media_url in self.media_cache:
Expand Down

0 comments on commit 18be59d

Please sign in to comment.