How to ignore message.headers (corrupt data chokes consumer) #925
-
Greetings, Some of the upstream publishers (things I don't have control over) are putting corrupt data into message.headers and it's choking the consumer. How can I get things to ignore this? I can't do anything in my code like a try/except block as it's happening before things get to my function. I don't need anything in the headers, so I'm hoping to find a way to just ignore this issue so I can parse out the body. Example error:
Small example of my function that handles this...it never gets to the @router.subscriber(settings.kafka.topic_consume)
async def handle_msg(payload: KafkaMessage):
"""
Primary entry point, consumes a message from kafka, processes it, and will send a notification
if a match is found
"""
logger.debug(f"New Alert: {payload}") |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Hi, @ChadDa3mon! In this case you should use custom wrapper |
Beta Was this translation helpful? Give feedback.
Hi, @ChadDa3mon! In this case you should use custom wrapper
You can just wrap original parser calling by try ... except ... and raise RejectMessage exception on error. Also, you can replace broken headers by empty one at exception and retry to serialize it by original parser again.