Skip to content

Commit

Permalink
Merge pull request #855 from allegro/release-0.12.4
Browse files Browse the repository at this point in the history
Release 0.12.4
  • Loading branch information
wojtkiewicz authored Apr 17, 2018
2 parents 079b310 + de29cc7 commit 608ed9c
Show file tree
Hide file tree
Showing 12 changed files with 214 additions and 13 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.12.4 (17.04.2018)

All issues and pull requests: [0.12.4 milestone](https://github.com/allegro/hermes/milestone/41)

## 0.12.3 (11.01.2018)

All issues and pull requests: [0.12.3 milestone](https://github.com/allegro/hermes/milestone/40)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public enum Configs {

CONSUMER_RECEIVER_POOL_TIMEOUT("consumer.receiver.pool.timeout", 100),
CONSUMER_RECEIVER_READ_QUEUE_CAPACITY("consumer.receiver.read.queue.capacity", 1000),
CONSUMER_RETRANSMISSION_QUEUE_CAPACITY("consumer.receiver.retransmission.queue.capacity", 20),

CONSUMER_COMMIT_OFFSET_PERIOD("consumer.commit.offset.period", 60),
CONSUMER_COMMIT_OFFSET_QUEUES_SIZE("consumer.commit.offset.queues.size", 200_000),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

public class RetransmissionException extends HermesException {

public RetransmissionException(String message) {
super(message);
}

public RetransmissionException(Throwable cause) {
super("Error during retransmitting messages.", cause);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ private UnwrappedMessageContent tryDeserializingUsingAnySchemaVersion(byte[] dat
schemaRepository.getAvroSchema(topic, version);
return avroMessageContentWrapper.unwrapContent(data, schema);
} catch (Exception ex) {
logger.debug("Failed to match schema for message for topic {}, schema version {}, fallback to previous.",
topic.getQualifiedName(), version.value());
logger.error("Failed to match schema for message for topic {}, schema version {}, fallback to previous.",
topic.getQualifiedName(), version.value(), ex);
}
}
logger.error("Could not match schema {} for message of topic {} {}",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;

class KafkaConsumerOffsetMover implements PartitionAssigningAwareRetransmitter.OffsetMover {
private KafkaConsumer consumer;

KafkaConsumerOffsetMover(KafkaConsumer consumer) {
this.consumer = consumer;
}

@Override
public void move(SubscriptionPartitionOffset offset) {
try {
consumer.seek(new TopicPartition(offset.getKafkaTopicName().asString(), offset.getPartition()), offset.getOffset());
} catch (IllegalStateException ex) {
/*
Unfortunately we can't differentiate between different kind of illegal states in KafkaConsumer.
What we are really interested in is IllegalStateException with message containing
"No current assignment for partition" but because this message can change in any minor release
we can just do a leap of fate and interpret IllegalStateException as PartitionNotAssignedException.
Throwing this exception should only cause retransmission to be retried after consumer rebalancing.
*/
throw new PartitionNotAssignedException(ex);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker;

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.allegro.tech.hermes.api.SubscriptionName;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class PartitionAssigningAwareRetransmitter implements ConsumerRebalanceListener {
private static final Logger logger = LoggerFactory.getLogger(PartitionAssigningAwareRetransmitter.class);

private final OffsetMover offsetMover;
private final SubscriptionName subscriptionName;
private final BlockingQueue<SubscriptionPartitionOffset> retransmissionQueue;

public PartitionAssigningAwareRetransmitter(SubscriptionName subscriptionName, int queueSize, KafkaConsumer consumer) {
this(subscriptionName, queueSize, new KafkaConsumerOffsetMover(consumer));
}

public PartitionAssigningAwareRetransmitter(SubscriptionName subscriptionName, int queueSize, OffsetMover offsetMover) {
this.offsetMover = offsetMover;
this.subscriptionName = subscriptionName;
this.retransmissionQueue = new ArrayBlockingQueue<>(queueSize);
}

@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// not interesting for retransmission
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
if (!retransmissionQueue.isEmpty()) {
logger.info("Detected scheduled retransmission for subscription {}", subscriptionName);
moveScheduledOffsets();
}
}

public void moveOffsetOrSchedule(SubscriptionPartitionOffset offset) {
try {
logger.info("Moving offset for subscription {} {}", offset.getSubscriptionName(), offset.toString());
offsetMover.move(offset);
} catch (PartitionNotAssignedException ex) {
logger.info("Failed to move offset right now, reason: {}", ex.getMessage());
boolean scheduled = retransmissionQueue.offer(offset);
if (scheduled) {
logger.info("Scheduled retransmission for subscription {} on next rebalance," +
" offset {}", subscriptionName, offset.toString());
} else {
logger.info("Failed to schedule new retransmission for subscription {}," +
"there is already retransmission scheduled on next rebalance.", subscriptionName);
}
}
}

public boolean isQueueEmpty() {
return retransmissionQueue.isEmpty();
}

private void moveScheduledOffsets() {
List<SubscriptionPartitionOffset> offsets = new ArrayList<>();
retransmissionQueue.drainTo(offsets);
offsets.forEach(offset -> {
try {
offsetMover.move(offset);
} catch (Exception ex) {
logger.info("Still cannot move offset after rebalance for partition {} for subscription {}," +
" possibly owned by different node",
offset.getPartition(), offset.getSubscriptionName(), ex);
}
});
}

interface OffsetMover {
void move(SubscriptionPartitionOffset offset) throws PartitionNotAssignedException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker;

import pl.allegro.tech.hermes.common.exception.RetransmissionException;

public class PartitionNotAssignedException extends RetransmissionException {
public PartitionNotAssignedException() {
super("");
}

public PartitionNotAssignedException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public MessageReceiver createMessageReceiver(Topic topic,
subscription,
clock,
configs.getIntProperty(Configs.CONSUMER_RECEIVER_POOL_TIMEOUT),
configs.getIntProperty(Configs.CONSUMER_RECEIVER_READ_QUEUE_CAPACITY));
configs.getIntProperty(Configs.CONSUMER_RECEIVER_READ_QUEUE_CAPACITY),
configs.getIntProperty(Configs.CONSUMER_RETRANSMISSION_QUEUE_CAPACITY));

if (configs.getBooleanProperty(Configs.CONSUMER_FILTERING_ENABLED)) {
FilteredMessageHandler filteredMessageHandler = new FilteredMessageHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import pl.allegro.tech.hermes.common.metric.HermesMetrics;
import pl.allegro.tech.hermes.consumers.consumer.Message;
import pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset;
import pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker.PartitionAssigningAwareRetransmitter;
import pl.allegro.tech.hermes.consumers.consumer.receiver.MessageReceiver;

import java.time.Clock;
Expand All @@ -43,6 +44,7 @@ public class KafkaSingleThreadedMessageReceiver implements MessageReceiver {
private final Clock clock;

private final BlockingQueue<Message> readQueue;
private final PartitionAssigningAwareRetransmitter retransmitter;

private final HermesMetrics metrics;
private Topic topic;
Expand All @@ -60,7 +62,8 @@ public KafkaSingleThreadedMessageReceiver(KafkaConsumer<byte[], byte[]> consumer
Subscription subscription,
Clock clock,
int pollTimeout,
int readQueueCapacity) {
int readQueueCapacity,
int retransmissionQueueCapacity) {
this.metrics = metrics;
this.topic = topic;
this.subscription = subscription;
Expand All @@ -70,8 +73,12 @@ public KafkaSingleThreadedMessageReceiver(KafkaConsumer<byte[], byte[]> consumer
this.consumer = consumer;
this.messageContentWrapper = messageContentWrapper;
this.clock = clock;
this.readQueue = new ArrayBlockingQueue<Message>(readQueueCapacity);
this.consumer.subscribe(topics.keySet());
this.readQueue = new ArrayBlockingQueue<>(readQueueCapacity);
this.retransmitter = new PartitionAssigningAwareRetransmitter(
subscription.getQualifiedName(),
retransmissionQueueCapacity,
consumer);
this.consumer.subscribe(topics.keySet(), retransmitter);
}

private Collection<KafkaTopic> getKafkaTopics(Topic topic, KafkaNamesMapper kafkaNamesMapper) {
Expand Down Expand Up @@ -112,8 +119,8 @@ public Optional<Message> next() {
}

private Message convertToMessage(ConsumerRecord<byte[], byte[]> record) {
UnwrappedMessageContent unwrappedContent = getUnwrappedMessageContent(record);
KafkaTopic kafkaTopic = topics.get(record.topic());
UnwrappedMessageContent unwrappedContent = getUnwrappedMessageContent(record, kafkaTopic.contentType());
return new Message(
unwrappedContent.getMessageMetadata().getId(),
topic.getQualifiedName(),
Expand All @@ -128,10 +135,11 @@ private Message convertToMessage(ConsumerRecord<byte[], byte[]> record) {
);
}

private UnwrappedMessageContent getUnwrappedMessageContent(ConsumerRecord<byte[], byte[]> message) {
if (topic.getContentType() == ContentType.AVRO) {
private UnwrappedMessageContent getUnwrappedMessageContent(ConsumerRecord<byte[], byte[]> message,
ContentType contentType) {
if (contentType == ContentType.AVRO) {
return messageContentWrapper.unwrapAvro(message.value(), topic);
} else if (topic.getContentType() == ContentType.JSON) {
} else if (contentType == ContentType.JSON) {
return messageContentWrapper.unwrapJson(message.value());
}
throw new UnsupportedContentTypeException(topic);
Expand Down Expand Up @@ -173,7 +181,6 @@ private Map<TopicPartition, OffsetAndMetadata> createOffset(Set<SubscriptionPart

@Override
public void moveOffset(SubscriptionPartitionOffset offset) {
logger.info("Moving offset for subscription {} {}", subscription.getQualifiedName(), offset.toString());
consumer.seek(new TopicPartition(offset.getKafkaTopicName().asString(), offset.getPartition()), offset.getOffset());
retransmitter.moveOffsetOrSchedule(offset);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package pl.allegro.tech.hermes.consumers.consumer.offset.kafka.broker

import spock.lang.Specification

import static pl.allegro.tech.hermes.api.SubscriptionName.fromString
import static pl.allegro.tech.hermes.consumers.consumer.offset.SubscriptionPartitionOffset.subscriptionPartitionOffset


class PartitionAssigningAwareRetransmitterTest extends Specification {

def topic = "group.topic"
def subscriptionName = "$topic\$subscription"
def offsetMover = Mock(PartitionAssigningAwareRetransmitter.OffsetMover)
def offset = subscriptionPartitionOffset(topic, subscriptionName, 1, 1)

def retransmitter = new PartitionAssigningAwareRetransmitter(fromString(subscriptionName), 10, offsetMover)

def "should move offset when partitions are already assigned"() {
when:
retransmitter.moveOffsetOrSchedule(offset)

then:
1 * offsetMover.move(offset)
retransmitter.isQueueEmpty()
}

def "should schedule retransmission if consumer does not have partitions assigned yet"() {
when:
retransmitter.moveOffsetOrSchedule(offset)

then:
1 * offsetMover.move(offset) >> { throw new PartitionNotAssignedException() }
!retransmitter.isQueueEmpty()
}

def "should trigger scheduled retransmission when partitions are assigned"() {
when:
retransmitter.moveOffsetOrSchedule(offset)
retransmitter.onPartitionsAssigned([])

then:
2 * offsetMover.move(offset) >> { throw new PartitionNotAssignedException() }
retransmitter.isQueueEmpty()
}

def "should not move offset after rebalance if nothing was scheduled"() {
when:
retransmitter.onPartitionsAssigned([])

then:
0 * offsetMover.move(_)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,15 @@ public Optional<RawSchema> getLatestSchema(TopicName topic) {
private Optional<String> extractSchema(String subject, String version, Response response) {
switch (response.getStatusInfo().getFamily()) {
case SUCCESSFUL:
logger.info("Found schema for subject {} at version {}", subject, version);
String schema = response.readEntity(SchemaRegistryResponse.class).getSchema();
return Optional.of(schema);
case CLIENT_ERROR:
logger.error("Could not find schema for subject {} at version {}, reason: {}", subject, version, response.getStatus());
return Optional.empty();
case SERVER_ERROR:
default:
logger.error("Could not find schema for subject {} at version {}, reason: {}", subject, version, response.getStatus());
throw new InternalSchemaRepositoryException(subject, response);
}
}
Expand Down
4 changes: 3 additions & 1 deletion integration/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ def getAlpnVersion() {
return '8.1.9.v20160720'
case 112..120:
return '8.1.10.v20161026'
case 121..151:
case 121..160:
return '8.1.11.v20170118'
case 161..162:
return '8.1.12.v20180117'
default:
throw new IllegalStateException("ALPN version not defined for Java version: ${javaVersion}; extracted minor version: ${version}")
}
Expand Down

0 comments on commit 608ed9c

Please sign in to comment.