Skip to content

Commit

Permalink
Merge pull request #45 from praekeltfoundation/dont-double-reject-mes…
Browse files Browse the repository at this point in the history
…sage

Don't double reject messages
  • Loading branch information
rudigiesler authored Jan 3, 2023
2 parents de9479e + 18be59d commit 030e07e
Showing 1 changed file with 17 additions and 17 deletions.
34 changes: 17 additions & 17 deletions vxwhatsapp/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,26 +94,26 @@ async def process_message(self, message: IncomingMessage):
await message.reject(requeue=False)
return

async with message.process():
logger.debug(f"Processing outbound message {msg}")
try:
await self.submit_message(msg)
except aiohttp.ClientResponseError as e:
# If it's a retryable upstream error, requeue the message
if e.status > 499:
await message.reject(requeue=True)
# If we've retried this before, wait before retrying again
if message.redelivered:
await sleep(0.5)
return

logger.debug(f"Processing outbound message {msg}")
try:
await self.submit_message(msg)
except aiohttp.ClientResponseError as e:
# If it's a retryable upstream error, requeue the message
if e.status > 499:
await message.reject(requeue=True)
# If we've retried this before, wait before retrying again
if message.redelivered:
await sleep(0.5)
else:
# Otherwise log the error and reject
logger.exception(f"Upstream HTTP error processing {msg}")
await message.reject(requeue=False)
except Exception:
# Any other errors aren't recoverable, so log and reject
logger.exception(f"Error processing {msg}")
await message.reject(requeue=False)
except Exception:
# Any other errors aren't recoverable, so log and reject
logger.exception(f"Error processing {msg}")
await message.reject(requeue=False)
else:
await message.ack()

async def get_media_id(self, media_url):
if media_url in self.media_cache:
Expand Down

0 comments on commit 030e07e

Please sign in to comment.