Skip to content

Commit

Permalink
fix possible pause/resume race
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Dec 20, 2024
1 parent 6e046e5 commit be27e5b
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 21 deletions.
55 changes: 46 additions & 9 deletions internal/consuming/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,30 @@ type KafkaConfig struct {

// PartitionBufferSize is the size of the buffer for each partition consumer.
// This is the number of records that can be buffered before the consumer
// will pause fetching records from Kafka. By default, this is 16.
// Set to -1 to use non-buffered channel.
// will pause fetching records from Kafka. By default, this is 8.
// Note, due to the way the consumer works with Kafka partitions, we do not
// allow using unbuffered channels for partition consumers. Specifically
// due to the race condition of the consumer pausing/resuming, covered in
// TestKafkaConsumer_TestPauseAfterResumeRace test case.
PartitionBufferSize int `mapstructure:"partition_buffer_size" json:"partition_buffer_size"`

// FetchMaxBytes is the maximum number of bytes to fetch from Kafka in a single request.
// If not set the default 50MB is used.
FetchMaxBytes int32 `mapstructure:"fetch_max_bytes" json:"fetch_max_bytes"`

// testOnlyConfig is an additional options which are used for testing purposes only.
testOnlyConfig testOnlyConfig
}

type testOnlyConfig struct {
topicPartitionBeforePauseCh chan topicPartition
topicPartitionPauseProceedCh chan struct{}
fetchTopicPartitionSubmittedCh chan kgo.FetchTopicPartition
}

type topicPartition struct {
t string
p int32
topic string
partition int32
}

type KafkaConsumer struct {
Expand Down Expand Up @@ -110,6 +122,9 @@ func NewKafkaConsumer(name string, nodeID string, logger Logger, dispatcher Disp
if config.MaxPollRecords == 0 {
config.MaxPollRecords = 100
}
if config.PartitionBufferSize < 0 {
return nil, errors.New("partition buffer size can't be negative")
}
consumer := &KafkaConsumer{
name: name,
nodeID: nodeID,
Expand Down Expand Up @@ -317,13 +332,37 @@ func (c *KafkaConsumer) pollUntilFatal(ctx context.Context) error {
case <-c.consumers[tp].quit:
return
case c.consumers[tp].recs <- p:
if c.config.testOnlyConfig.fetchTopicPartitionSubmittedCh != nil { // Only set in tests.
c.config.testOnlyConfig.fetchTopicPartitionSubmittedCh <- p
}
default:
if c.config.testOnlyConfig.topicPartitionBeforePauseCh != nil { // Only set in tests.
c.config.testOnlyConfig.topicPartitionBeforePauseCh <- tp
}
if c.config.testOnlyConfig.topicPartitionPauseProceedCh != nil { // Only set in tests.
<-c.config.testOnlyConfig.topicPartitionPauseProceedCh
}

partitionsToPause := map[string][]int32{p.Topic: {p.Partition}}
// PauseFetchPartitions here to not poll partition until records are processed.
// This allows parallel processing of records from different partitions, without
// keeping records in memory and blocking rebalance. Resume will be called after
// records are processed by c.consumers[tp].
c.client.PauseFetchPartitions(partitionsToPause)
defer func() {
// There is a chance that message processor resumed partition processing before
// we called PauseFetchPartitions above. Such a race was observed in a service
// under CPU throttling conditions. In that case Pause is called after Resume,
// and topic is never resumed after that. To avoid we check if the channel current
// len is less than cap, if len < cap => we can be sure that buffer has space now,
// so we can resume partition processing. If it is not, this means that the records
// are still not processed and resume will be called eventually after processing by
// partition consumer. See also TestKafkaConsumer_TestPauseAfterResumeRace test case.
if len(c.consumers[tp].recs) < cap(c.consumers[tp].recs) {
c.client.ResumeFetchPartitions(partitionsToPause)
}
}()

pausedTopicPartitions[tp] = struct{}{}
// To poll next time since correct offset we need to set it manually to the offset of
// the first record in the batch. Otherwise, next poll will return the next record batch,
Expand Down Expand Up @@ -364,13 +403,11 @@ func (c *KafkaConsumer) reInitClient(ctx context.Context) error {
return nil
}

const defaultPartitionBufferSize = 16
const defaultPartitionBufferSize = 8

func (c *KafkaConsumer) assigned(ctx context.Context, cl *kgo.Client, assigned map[string][]int32) {
bufferSize := c.config.PartitionBufferSize
if bufferSize == -1 {
bufferSize = 0
} else if bufferSize == 0 {
if bufferSize == 0 {
bufferSize = defaultPartitionBufferSize
}
for topic, partitions := range assigned {
Expand Down Expand Up @@ -501,7 +538,7 @@ func (pc *partitionConsumer) consume() {
return
case p := <-pc.recs:
pc.processRecords(p.Records)
// At this point we are ready to consume the next batch from partition, thus resume.
// After processing records, we can resume partition processing (if it was paused, no-op otherwise).
resumeConsuming()
}
}
Expand Down
142 changes: 130 additions & 12 deletions internal/consuming/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func TestKafkaConsumer_BlockedPartitionDoesNotBlockAnotherTopic(t *testing.T) {
// is stuck on it. We want to make sure that the consumer is not blocked and can still process
// messages from other topic partitions.
func TestKafkaConsumer_BlockedPartitionDoesNotBlockAnotherPartition(t *testing.T) {
partitionBufferSizes := []int{-1, 0}
partitionBufferSizes := []int{0, 1}

for _, partitionBufferSize := range partitionBufferSizes {
t.Run(fmt.Sprintf("partition_buffer_size_%d", partitionBufferSize), func(t *testing.T) {
Expand Down Expand Up @@ -458,21 +458,32 @@ func TestKafkaConsumer_PausePartitions(t *testing.T) {
consumerClosed := make(chan struct{})
doneCh := make(chan struct{})

fetchSubmittedCh := make(chan kgo.FetchTopicPartition)
beforePauseCh := make(chan topicPartition)

config := KafkaConfig{
Brokers: []string{testKafkaBrokerURL},
Topics: []string{testKafkaTopic},
ConsumerGroup: uuid.New().String(),
PartitionBufferSize: -1,
Brokers: []string{testKafkaBrokerURL},
Topics: []string{testKafkaTopic},
ConsumerGroup: uuid.New().String(),

PartitionBufferSize: 1,

testOnlyConfig: testOnlyConfig{
fetchTopicPartitionSubmittedCh: fetchSubmittedCh,
topicPartitionBeforePauseCh: beforePauseCh,
},
}

numCalls := 0

unblockCh := make(chan struct{})

mockDispatcher := &MockDispatcher{
onDispatch: func(ctx context.Context, method string, data []byte) error {
numCalls++
if numCalls == 1 {
close(event1Received)
time.Sleep(5 * time.Second)
<-unblockCh
return nil
} else if numCalls == 2 {
close(event2Received)
Expand All @@ -488,14 +499,22 @@ func TestKafkaConsumer_PausePartitions(t *testing.T) {
go func() {
err = produceTestMessage(testKafkaTopic, testPayload1)
require.NoError(t, err)
<-fetchSubmittedCh
<-event1Received
// At this point message 1 is being processed and the next produced message will
// cause a partition pause.

err = produceTestMessage(testKafkaTopic, testPayload2)
require.NoError(t, err)
<-event2Received
<-fetchSubmittedCh

// At this point message 1 is being processed and the next produced message must
// cause a partition pause.
err = produceTestMessage(testKafkaTopic, testPayload3)
require.NoError(t, err)
<-beforePauseCh // Wait for triggering the partition pause.

// Unblock the message processing.
close(unblockCh)
<-fetchSubmittedCh
}()

go func() {
Expand All @@ -521,9 +540,7 @@ func TestKafkaConsumer_WorksCorrectlyInLoadedTopic(t *testing.T) {
numMessages int
partitionBuffer int
}{
//{numPartitions: 1, numMessages: 1000, partitionBuffer: -1},
//{numPartitions: 1, numMessages: 1000, partitionBuffer: 1},
//{numPartitions: 10, numMessages: 10000, partitionBuffer: -1},
//{numPartitions: 1, numMessages: 1000, partitionBuffer: 1}
{numPartitions: 10, numMessages: 10000, partitionBuffer: 1},
}

Expand Down Expand Up @@ -606,3 +623,104 @@ func TestKafkaConsumer_WorksCorrectlyInLoadedTopic(t *testing.T) {
})
}
}

// TestKafkaConsumer_TestPauseAfterResumeRace tests a scenario where a partition was
// paused after it was resumed and partition never processed any messages after that.
func TestKafkaConsumer_TestPauseAfterResumeRace(t *testing.T) {
t.Parallel()
testKafkaTopic := "consumer_test_" + uuid.New().String()
testPayload1 := []byte(`{"input":"value1"}`)
testPayload2 := []byte(`{"input":"value2"}`)
testPayload3 := []byte(`{"input":"value3"}`)

ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
defer cancel()

err := createTestTopic(ctx, testKafkaTopic, 1, 1)
require.NoError(t, err)

consumerClosed := make(chan struct{})
doneCh := make(chan struct{})

messageCh := make(chan struct{}, 128)

partitionBeforePauseCh := make(chan topicPartition)
partitionPauseProceedCh := make(chan struct{})
fetchSubmittedCh := make(chan kgo.FetchTopicPartition)

config := KafkaConfig{
Brokers: []string{testKafkaBrokerURL},
Topics: []string{testKafkaTopic},
ConsumerGroup: uuid.New().String(),
PartitionBufferSize: 1,

testOnlyConfig: testOnlyConfig{
topicPartitionBeforePauseCh: partitionBeforePauseCh,
topicPartitionPauseProceedCh: partitionPauseProceedCh,
fetchTopicPartitionSubmittedCh: fetchSubmittedCh,
},
}

count := 0
proceedCh := make(chan struct{})
firstMessageReceived := make(chan struct{})

mockDispatcher := &MockDispatcher{
onDispatch: func(ctx context.Context, method string, data []byte) error {
if count == 0 {
close(firstMessageReceived)
// Block until we are allowed to proceed
t.Logf("waiting for proceed")
<-proceedCh
t.Logf("proceeding")
}
count++
messageCh <- struct{}{}
t.Logf("message processed")
return nil
},
}
consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, mockDispatcher, config)
require.NoError(t, err)

go func() {
err := consumer.Run(ctx)
require.ErrorIs(t, err, context.Canceled)
close(consumerClosed)
}()

go func() {
err = produceTestMessage(testKafkaTopic, testPayload1)
require.NoError(t, err)

<-fetchSubmittedCh
t.Logf("fetch 1 submitted")

// This one should be buffered.
err = produceTestMessage(testKafkaTopic, testPayload2)
require.NoError(t, err)

<-fetchSubmittedCh
t.Logf("fetch 2 submitted")

// This message pauses the partition consumer.
err = produceTestMessage(testKafkaTopic, testPayload3)
require.NoError(t, err)
<-partitionBeforePauseCh
close(proceedCh)
// Give consumer some time to process messages, so we can be sure that resume was called.
time.Sleep(time.Second)
// And now we can proceed so that partition will be paused after resume.
close(partitionPauseProceedCh)
// Wait for the third message submitted for processing.
<-fetchSubmittedCh
}()

for i := 0; i < 3; i++ {
<-messageCh
}

cancel()
waitCh(t, consumerClosed, 30*time.Second, "timeout waiting for consumer closed")
close(doneCh)
}

0 comments on commit be27e5b

Please sign in to comment.