Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetched commited offsets should be validated before starting to consume from it #4931

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,29 @@
# librdkafka v2.6.2

librdkafka v2.6.2 is a maintenance release:

* Fixes to allow to migrate partitions to leaders with same leader epoch,
or NULL leader epoch (#4901).


## Fixes

### Consumer fixes

* Issues: #4796.
Fix to allow to migrate partitions to leaders with NULL leader epoch.
NULL leader epoch can happen during a cluster roll with an upgrade to a
version supporting KIP-320.
Happening since v2.1.0 (#4901).
* Issues: #4804.
Fix to allow to migrate partitions to leaders with same leader epoch.
Same leader epoch can happen when partition is
temporarily migrated to the internal broker (#4804), or if broker implementation
never bumps it, as it's not needed to validate the offsets.
Happening since v2.4.0 (#4901).



# librdkafka v2.6.1

librdkafka v2.6.1 is a maintenance release:
Expand Down
3 changes: 2 additions & 1 deletion src/rdkafka_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -2067,7 +2067,8 @@ rd_kafka_metadata_update_op(rd_kafka_t *rk, rd_kafka_metadata_internal_t *mdi) {
.partitions[part]
.leader_epoch;

if (current_leader_epoch >= mdpi->leader_epoch) {
if (mdpi->leader_epoch != -1 &&
current_leader_epoch > mdpi->leader_epoch) {
rd_kafka_broker_destroy(rkb);
rd_kafka_dbg(
rk, METADATA, "METADATAUPDATE",
Expand Down
9 changes: 5 additions & 4 deletions src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ rd_kafka_mock_committed_offset_find(const rd_kafka_mock_partition_t *mpart,
rd_kafka_mock_committed_offset_t *
rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,
const rd_kafkap_str_t *group,
int64_t offset,
rd_kafka_fetch_pos_t pos,
const rd_kafkap_str_t *metadata) {
rd_kafka_mock_committed_offset_t *coff;

Expand All @@ -561,12 +561,13 @@ rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,

coff->metadata = rd_kafkap_str_copy(metadata);

coff->offset = offset;
coff->pos = pos;

rd_kafka_dbg(mpart->topic->cluster->rk, MOCK, "MOCK",
"Topic %s [%" PRId32 "] committing offset %" PRId64
"Topic %s [%" PRId32
"] committing offset %s"
" for group %.*s",
mpart->topic->name, mpart->id, offset,
mpart->topic->name, mpart->id, rd_kafka_fetch_pos2str(pos),
RD_KAFKAP_STR_PR(group));

return coff;
Expand Down
37 changes: 27 additions & 10 deletions src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,26 @@ void rd_kafka_mock_Fetch_reply_tags_partition_write(
rd_kafka_mock_partition_t *mpart) {
switch (tagtype) {
case 1: /* CurrentLeader */
{
int32_t leader_id = mpart->leader->id,
leader_epoch = mpart->leader_epoch;
rd_kafka_mock_partition_leader_t *mpart_leader =
rd_kafka_mock_partition_next_leader_response(mpart);
if (mpart_leader) {
leader_id = mpart_leader->leader_id;
leader_epoch = mpart_leader->leader_epoch;
rd_kafka_mock_partition_leader_destroy(mpart,
mpart_leader);
}

/* Leader id */
rd_kafka_buf_write_i32(rkbuf, mpart->leader->id);
rd_kafka_buf_write_i32(rkbuf, leader_id);
/* Leader epoch */
rd_kafka_buf_write_i32(rkbuf, mpart->leader_epoch);
rd_kafka_buf_write_i32(rkbuf, leader_epoch);
/* Field tags */
rd_kafka_buf_write_tags_empty(rkbuf);
break;
}
default:
break;
}
Expand Down Expand Up @@ -917,12 +930,13 @@ static int rd_kafka_mock_handle_OffsetFetch(rd_kafka_mock_connection_t *mconn,
mpart, &GroupId);

/* Response: CommittedOffset */
rd_kafka_buf_write_i64(resp, coff ? coff->offset : -1);
rd_kafka_buf_write_i64(resp,
coff ? coff->pos.offset : -1);

if (rkbuf->rkbuf_reqhdr.ApiVersion >= 5) {
/* Response: CommittedLeaderEpoch */
rd_kafka_buf_write_i32(
resp, mpart ? mpart->leader_epoch : -1);
resp, coff ? coff->pos.leader_epoch : -1);
}

/* Response: Metadata */
Expand All @@ -939,10 +953,11 @@ static int rd_kafka_mock_handle_OffsetFetch(rd_kafka_mock_connection_t *mconn,
rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
"Topic %s [%" PRId32
"] returning "
"committed offset %" PRId64
"committed offset %s"
" for group %s",
mtopic->name, mpart->id,
coff->offset, coff->group);
rd_kafka_fetch_pos2str(coff->pos),
coff->group);
else
rd_kafka_dbg(mcluster->rk, MOCK, "MOCK",
"Topic %.*s [%" PRId32
Expand Down Expand Up @@ -1070,6 +1085,7 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn,
rd_kafka_mock_partition_t *mpart = NULL;
rd_kafka_resp_err_t err = all_err;
int64_t CommittedOffset;
int32_t CommittedLeaderEpoch = -1;
rd_kafkap_str_t Metadata;

rd_kafka_buf_read_i32(rkbuf, &Partition);
Expand All @@ -1087,7 +1103,6 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_read_i64(rkbuf, &CommittedOffset);

if (rkbuf->rkbuf_reqhdr.ApiVersion >= 6) {
int32_t CommittedLeaderEpoch;
rd_kafka_buf_read_i32(rkbuf,
&CommittedLeaderEpoch);

Expand All @@ -1106,9 +1121,11 @@ static int rd_kafka_mock_handle_OffsetCommit(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_skip_tags(rkbuf);

if (!err)
rd_kafka_mock_commit_offset(mpart, &GroupId,
CommittedOffset,
&Metadata);
rd_kafka_mock_commit_offset(
mpart, &GroupId,
RD_KAFKA_FETCH_POS(CommittedOffset,
CommittedLeaderEpoch),
&Metadata);

/* Response: ErrorCode */
rd_kafka_buf_write_i16(resp, err);
Expand Down
4 changes: 2 additions & 2 deletions src/rdkafka_mock_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ typedef struct rd_kafka_mock_committed_offset_s {
/**< mpart.committed_offsets */
TAILQ_ENTRY(rd_kafka_mock_committed_offset_s) link;
char *group; /**< Allocated along with the struct */
int64_t offset; /**< Committed offset */
rd_kafka_fetch_pos_t pos; /**< Committed position */
rd_kafkap_str_t *metadata; /**< Metadata, allocated separately */
} rd_kafka_mock_committed_offset_t;

Expand Down Expand Up @@ -481,7 +481,7 @@ rd_kafka_mock_committed_offset_find(const rd_kafka_mock_partition_t *mpart,
rd_kafka_mock_committed_offset_t *
rd_kafka_mock_commit_offset(rd_kafka_mock_partition_t *mpart,
const rd_kafkap_str_t *group,
int64_t offset,
rd_kafka_fetch_pos_t pos,
const rd_kafkap_str_t *metadata);

const rd_kafka_mock_msgset_t *
Expand Down
38 changes: 33 additions & 5 deletions src/rdkafka_offset.c
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,21 @@ static void rd_kafka_offset_validate_tmr_cb(rd_kafka_timers_t *rkts,
rd_kafka_toppar_t *rktp = arg;

rd_kafka_toppar_lock(rktp);
rd_kafka_offset_validate(rktp, "retrying offset validation");
/* Retry validation only when it's still needed.
* Even if validation can be started in fetch states ACTIVE and
* VALIDATE_EPOCH_WAIT, its retry should be done only
* in fetch state VALIDATE_EPOCH_WAIT. */
if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT)
rd_kafka_offset_validate(rktp, "retrying offset validation");
else {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, FETCH, "VALIDATE",
"%.*s [%" PRId32
"]: skipping offset "
"validation retry in fetch state %s",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rd_kafka_fetch_states[rktp->rktp_fetch_state]);
}
rd_kafka_toppar_unlock(rktp);
}

Expand All @@ -923,6 +937,9 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,
rd_kafka_topic_partition_t *rktpar;
int64_t end_offset;
int32_t end_offset_leader_epoch;
rd_kafka_toppar_lock(rktp);
rktp->rktp_flags &= ~RD_KAFKA_TOPPAR_F_VALIDATING;
rd_kafka_toppar_unlock(rktp);

if (err == RD_KAFKA_RESP_ERR__DESTROY) {
rd_kafka_toppar_destroy(rktp); /* Drop refcnt */
Expand Down Expand Up @@ -1142,12 +1159,10 @@ void rd_kafka_offset_validate(rd_kafka_toppar_t *rktp, const char *fmt, ...) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, FETCH, "VALIDATE",
"%.*s [%" PRId32
"]: unable to perform offset "
"validation: partition leader not available",
"validation: partition leader not available. "
"Retrying when available",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition);

rd_kafka_toppar_set_fetch_state(rktp,
RD_KAFKA_TOPPAR_FETCH_ACTIVE);
return;
}

Expand All @@ -1169,8 +1184,21 @@ void rd_kafka_offset_validate(rd_kafka_toppar_t *rktp, const char *fmt, ...) {
return;
}

if (rktp->rktp_flags & RD_KAFKA_TOPPAR_F_VALIDATING) {
rd_kafka_dbg(
rktp->rktp_rkt->rkt_rk, FETCH, "VALIDATE",
"%.*s [%" PRId32
"]: skipping offset "
"validation for %s: validation is already ongoing",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition,
rd_kafka_fetch_pos2str(rktp->rktp_offset_validation_pos));
return;
}

rd_kafka_toppar_set_fetch_state(
rktp, RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT);
rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_VALIDATING;

/* Construct and send OffsetForLeaderEpochRequest */
parts = rd_kafka_topic_partition_list_new(1);
Expand Down
2 changes: 2 additions & 0 deletions src/rdkafka_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,8 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
#define RD_KAFKA_TOPPAR_F_ASSIGNED \
0x2000 /**< Toppar is part of the consumer \
* assignment. */
#define RD_KAFKA_TOPPAR_F_VALIDATING \
0x4000 /**< Toppar is currently requesting validation. */

/*
* Timers
Expand Down
102 changes: 48 additions & 54 deletions src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,8 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt,
int32_t leader_epoch) {
rd_kafka_toppar_t *rktp;
rd_bool_t need_epoch_validation = rd_false;
int r = 0;
rd_bool_t fetching_from_follower;
int r = 0;

rktp = rd_kafka_toppar_get(rkt, partition, 0);
if (unlikely(!rktp)) {
Expand All @@ -681,7 +682,7 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt,

rd_kafka_toppar_lock(rktp);

if (leader_epoch < rktp->rktp_leader_epoch) {
if (leader_epoch != -1 && leader_epoch < rktp->rktp_leader_epoch) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER",
"%s [%" PRId32
"]: ignoring outdated metadata update with "
Expand All @@ -691,68 +692,61 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, leader_epoch,
rktp->rktp_leader_epoch);
if (rktp->rktp_fetch_state !=
RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) {
rd_kafka_toppar_unlock(rktp);
rd_kafka_toppar_destroy(rktp); /* from get() */
return 0;
}
rd_kafka_toppar_unlock(rktp);
rd_kafka_toppar_destroy(rktp); /* from get() */
return 0;
}

if (rktp->rktp_leader_epoch == -1 ||
leader_epoch > rktp->rktp_leader_epoch) {
rd_bool_t fetching_from_follower;
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER",
"%s [%" PRId32 "]: leader %" PRId32
" epoch %" PRId32 " -> leader %" PRId32
" epoch %" PRId32,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, rktp->rktp_leader_id,
rktp->rktp_leader_epoch, leader_id, leader_epoch);
if (leader_epoch > rktp->rktp_leader_epoch)
rktp->rktp_leader_epoch = leader_epoch;
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER",
"%s [%" PRId32 "]: leader %" PRId32 " epoch %" PRId32
" -> leader %" PRId32 " epoch %" PRId32,
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition,
rktp->rktp_leader_id, rktp->rktp_leader_epoch, leader_id,
leader_epoch);

if (leader_epoch > rktp->rktp_leader_epoch ||
rktp->rktp_fetch_state ==
RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) {
/* Epoch increased and needs to be validated (leader_epoch > -1)
* or we need to complete the validation. */
need_epoch_validation = rd_true;
}

rktp->rktp_leader_epoch = leader_epoch;

fetching_from_follower =
leader != NULL && rktp->rktp_broker != NULL &&
rktp->rktp_broker->rkb_source != RD_KAFKA_INTERNAL &&
rktp->rktp_broker != leader;

if (fetching_from_follower &&
rktp->rktp_leader_id == leader_id) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER",
"Topic %s [%" PRId32 "]: leader %" PRId32
" unchanged, "
"not migrating away from preferred "
"replica %" PRId32,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, leader_id,
rktp->rktp_broker_id);
r = 0;
fetching_from_follower =
leader != NULL && rktp->rktp_broker != NULL &&
rktp->rktp_broker->rkb_source != RD_KAFKA_INTERNAL &&
rktp->rktp_broker != leader;

} else {
if (fetching_from_follower && rktp->rktp_leader_id == leader_id) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER",
"Topic %s [%" PRId32 "]: leader %" PRId32
" unchanged, "
"not migrating away from preferred "
"replica %" PRId32,
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition, leader_id,
rktp->rktp_broker_id);
r = 0;

if (rktp->rktp_leader_id != leader_id ||
rktp->rktp_leader != leader) {
/* Update leader if it has changed */
rktp->rktp_leader_id = leader_id;
if (rktp->rktp_leader)
rd_kafka_broker_destroy(
rktp->rktp_leader);
if (leader)
rd_kafka_broker_keep(leader);
rktp->rktp_leader = leader;
}
} else {

/* Update handling broker */
r = rd_kafka_toppar_broker_update(
rktp, leader_id, leader, "leader updated");
if (rktp->rktp_leader_id != leader_id ||
rktp->rktp_leader != leader) {
/* Update leader if it has changed */
rktp->rktp_leader_id = leader_id;
if (rktp->rktp_leader)
rd_kafka_broker_destroy(rktp->rktp_leader);
if (leader)
rd_kafka_broker_keep(leader);
rktp->rktp_leader = leader;
}

} else if (rktp->rktp_fetch_state ==
RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT)
need_epoch_validation = rd_true;
/* Update handling broker */
r = rd_kafka_toppar_broker_update(rktp, leader_id, leader,
"leader updated");
}

if (need_epoch_validation) {
/* Set offset validation position,
Expand Down
Loading