Skip to content

Commit

Permalink
feat(kafka): add hanging break (#402)
Browse files Browse the repository at this point in the history
* feat(kafka): add hanging break

* exit immideately if no messages read

* delete excess import
  • Loading branch information
IlyaFaer authored Mar 18, 2024
1 parent d25b439 commit 9adf9c3
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions sources/kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
data is read only up to it, overriding the default Kafka's
behavior of waiting for new messages in endless loop.
"""

from contextlib import closing
from typing import Any, Callable, Dict, Iterable, List, Optional, Union

Expand Down Expand Up @@ -52,7 +53,7 @@ def kafka_consumer(
before it's transfered to the destination.
batch_size (Optional[int]): Messages batch size to read at once.
batch_timeout (Optional[int]): Maximum time to wait for a batch
consume.
consume, in seconds.
start_from (Optional[TAnyDateTime]): A timestamp, at which to start
reading. Older messages are ignored.
Expand Down Expand Up @@ -84,8 +85,12 @@ def kafka_consumer(
# not waiting for new messages
with closing(consumer):
while tracker.has_unread:
messages = consumer.consume(batch_size, timeout=batch_timeout)
if not messages:
break

batch = []
for msg in consumer.consume(batch_size, timeout=batch_timeout):
for msg in messages:
if msg.error():
err = msg.error()
if err.retriable() or not err.fatal():
Expand Down

0 comments on commit 9adf9c3

Please sign in to comment.