Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support OffsetRequest_v2 #1087

Open
y4n9squared opened this issue Jan 22, 2025 · 0 comments
Open

Support OffsetRequest_v2 #1087

y4n9squared opened this issue Jan 22, 2025 · 0 comments

Comments

@y4n9squared
Copy link
Contributor

Describe the solution you'd like

Currently, AIOKafkaConsumer.end_offsets fires off either an OffsetRequest_v0 or OffsetRequest_v1 message to the broker.

Here, the broker will reply with the high watermark (HWM), but if you need the last stable offset (LSO) instead, you need to supply the isolation_level. This field was added in OffsetRequest_v2.

async def _proc_offset_request(self, node_id, topic_data):
if self._client.api_version < (0, 10, 1):
version = 0
# Version 0 had another field `max_offsets`, set it to `1`
for topic, part_data in topic_data.items():
topic_data[topic] = [(part, ts, 1) for part, ts in part_data]
else:
version = 1
request = OffsetRequest[version](-1, list(topic_data.items()))
response = await self._client.send(node_id, request)

The Fetcher class already has the isolation level of the consumer, so can we switch this API call to use the newer protocol?

For example:

request = OffsetRequest[2](-1, self._isolation_level, list(topic_data.items()))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant