-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added timeouts on send and flush calls in KafkaProducerWrapper #696
base: master
Are you sure you want to change the base?
Changes from 4 commits
fe19a50
abf10d5
3134d6f
988d8a5
1b8571e
9a11735
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
/** | ||
* Copyright 2020 LinkedIn Corporation. All rights reserved. | ||
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. | ||
* See the NOTICE file in the project root for additional information regarding copyright ownership. | ||
*/ | ||
package com.linkedin.datastream.kafka; | ||
|
||
import java.time.Duration; | ||
import java.util.Properties; | ||
|
||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CompletionException; | ||
|
||
import org.apache.kafka.clients.producer.Callback; | ||
import org.apache.kafka.clients.producer.Producer; | ||
import org.apache.kafka.clients.producer.ProducerRecord; | ||
import org.apache.kafka.clients.producer.RecordMetadata; | ||
import org.apache.kafka.common.errors.InterruptException; | ||
|
||
import com.linkedin.datastream.common.CompletableFutureUtils; | ||
import com.linkedin.datastream.common.VerifiableProperties; | ||
|
||
/** | ||
* An extension of {@link KafkaProducerWrapper} with bounded calls for flush and send | ||
*/ | ||
class BoundedKafkaProducerWrapper<K, V> extends KafkaProducerWrapper<K, V> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
private static final int DEFAULT_SEND_TIME_OUT = 5000; | ||
private static final int DEFAULT_FLUSH_TIME_OUT = 10 * 60 * 1000; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
private static final String SEND_TIMEOUT_CONFIG_KEY = "brooklin.server.kafkaProducerWrapper.sendTimeout"; | ||
private static final String FLUSH_TIMEOUT_CONFIG_KEY = "brooklin.server.kafkaProducerWrapper.flushTimeout"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't the right way to pass configs. Any configs you want to pass here need to be scoped under the transport provider configs. The KafkaProducerWrapper receives all of the transport provider configs, so you'd want to add these under that scope. We don't need to access the full property name, because as the configs pass through the layers, the relevant prefixes are removed. You can see how other configs are accessed in KafkaProducerWrapper, you'll see they don't have that whole "brooklin.server" prefix. You can access these the same way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @somandal I've discussed this with Ahmed. Will push the fixe soon |
||
|
||
private int _sendTimeout; | ||
private int _flushTimeout; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
BoundedKafkaProducerWrapper(String logSuffix, Properties props, String metricsNamesPrefix) { | ||
super(logSuffix, props, metricsNamesPrefix); | ||
|
||
VerifiableProperties properties = new VerifiableProperties(props); | ||
_sendTimeout = properties.getInt(SEND_TIMEOUT_CONFIG_KEY, DEFAULT_SEND_TIME_OUT); | ||
_flushTimeout = properties.getInt(FLUSH_TIMEOUT_CONFIG_KEY, DEFAULT_FLUSH_TIME_OUT); | ||
} | ||
|
||
@Override | ||
void doSend(Producer<K, V> producer, ProducerRecord<K, V> record, Callback callback) { | ||
CompletableFutureUtils.within(produceMessage(producer, record), Duration.ofMillis(_sendTimeout)) | ||
.thenAccept(m -> callback.onCompletion(m, null)) | ||
.exceptionally(completionEx -> { | ||
Throwable cause = completionEx.getCause(); | ||
if (cause instanceof KafkaClientException) { | ||
KafkaClientException ex = (KafkaClientException) cause; | ||
callback.onCompletion(ex.getMetadata(), (Exception) ex.getCause()); | ||
} else if (cause instanceof java.util.concurrent.TimeoutException) { | ||
_log.warn("KafkaProducerWrapper send timed out. The destination topic may be unavailable."); | ||
callback.onCompletion(null, (java.util.concurrent.TimeoutException) cause); | ||
} | ||
return null; | ||
}); | ||
} | ||
|
||
private CompletableFuture<RecordMetadata> produceMessage(Producer<K, V> producer, ProducerRecord<K, V> record) { | ||
CompletableFuture<RecordMetadata> future = new CompletableFuture<>(); | ||
|
||
producer.send(record, (metadata, exception) -> { | ||
if (exception == null) { | ||
future.complete(metadata); | ||
} else { | ||
future.completeExceptionally(new KafkaClientException(metadata, exception)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we need to propagate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There were tons of checks up the callback chain that are dealing with the metadata and exception. You're probably right, I need to see if it's safe to do so and remove |
||
} | ||
}); | ||
|
||
return future; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thinking out loud:
This seems like something this method can do with a private executor service. I am not sure we really need a utils class just for this purpose. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ahmedahamid I just think that cancellation and timeout are semantically different. We may want to cancel the future after timeout in our case, but that's not necessarily true in general. Also, smth may be cancelled without waiting for timeout (based on user input or other external factors). Just thinking out loud. Will see whether I can get rid of the utils. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I didn't mean to suggest we should propagate |
||
|
||
@Override | ||
synchronized void flush() { | ||
if (_kafkaProducer != null) { | ||
try { | ||
CompletableFutureUtils.within(CompletableFuture.runAsync(() -> _kafkaProducer.flush()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I'm afraid our only option here seems to be using an // It's okay to use a single thread executor since flush() is synchronized
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(() -> super.flush());
try {
// Block until timeout elapses
future.get(_flushTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
...
// Interrupt the Producer.flush() call to free up the blocked thread
future.cancel(true);
...
} |
||
Duration.ofMillis(_flushTimeout)).join(); | ||
} catch (CompletionException e) { | ||
Throwable cause = e.getCause(); | ||
|
||
if (cause instanceof InterruptException) { | ||
// The KafkaProducer object should not be reused on an interrupted flush | ||
_log.warn("Kafka producer flush interrupted, closing producer {}.", _kafkaProducer); | ||
shutdownProducer(); | ||
throw (InterruptException) cause; | ||
} else if (cause instanceof java.util.concurrent.TimeoutException) { | ||
_log.warn("Kafka producer flush timed out after {}ms. Destination topic may be unavailable.", _flushTimeout); | ||
} | ||
|
||
throw e; | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
/** | ||
* Copyright 2020 LinkedIn Corporation. All rights reserved. | ||
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. | ||
* See the NOTICE file in the project root for additional information regarding copyright ownership. | ||
*/ | ||
|
||
package com.linkedin.datastream.kafka; | ||
import org.apache.kafka.clients.producer.RecordMetadata; | ||
|
||
class KafkaClientException extends Exception { | ||
private static final long serialVersionUID = 1; | ||
|
||
private final RecordMetadata _metadata; | ||
|
||
public RecordMetadata getMetadata() { | ||
return _metadata; | ||
} | ||
|
||
public KafkaClientException(RecordMetadata metadata, Throwable innerException) { | ||
super(innerException); | ||
_metadata = metadata; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.function.Supplier; | ||
|
||
import org.apache.commons.lang.exception.ExceptionUtils; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nothing wrong with this but I'd recommend This would also entail adding an explicit dependency for this module on it in project(':datastream-kafka') {
dependencies {
...
compile "org.apache.commons:commons-lang3:$commonslang3Version"
|
||
import org.apache.kafka.clients.producer.Callback; | ||
import org.apache.kafka.clients.producer.Producer; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
|
@@ -60,7 +61,8 @@ class KafkaProducerWrapper<K, V> { | |
|
||
private static final int TIME_OUT = 2000; | ||
private static final int MAX_SEND_ATTEMPTS = 10; | ||
private final Logger _log; | ||
|
||
protected final Logger _log; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this with other protected member variables |
||
private final long _sendFailureRetryWaitTimeMs; | ||
|
||
private final String _clientId; | ||
|
@@ -72,8 +74,7 @@ class KafkaProducerWrapper<K, V> { | |
// Producer is lazily initialized during the first send call. | ||
// Also, can be nullified in case of exceptions, and recreated by subsequent send calls. | ||
// Mark as volatile as it is mutable and used by different threads | ||
private volatile Producer<K, V> _kafkaProducer; | ||
|
||
protected volatile Producer<K, V> _kafkaProducer; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not necessary to mark member fields/methods Since |
||
private final KafkaProducerFactory<K, V> _producerFactory; | ||
|
||
// Limiter to control how fast producers are re-created after failures. | ||
|
@@ -142,7 +143,7 @@ private void populateDefaultProducerConfigs() { | |
DEFAULT_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_VALUE); | ||
} | ||
|
||
private Optional<Producer<K, V>> maybeGetKafkaProducer(DatastreamTask task) { | ||
protected Optional<Producer<K, V>> maybeGetKafkaProducer(DatastreamTask task) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see you overriding this in your Bounded implementation, why make this protected? |
||
Producer<K, V> producer = _kafkaProducer; | ||
if (producer == null) { | ||
producer = initializeProducer(task); | ||
|
@@ -186,56 +187,62 @@ Producer<K, V> createKafkaProducer() { | |
return _producerFactory.createProducer(_props); | ||
} | ||
|
||
/** | ||
* There are two known cases that lead to IllegalStateException and we should retry: | ||
* (1) number of brokers is less than minISR | ||
* (2) producer is closed in generateSendFailure by another thread | ||
* (3) For either condition, we should retry as broker comes back healthy or producer is recreated | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason why you moved these comments out? Also, I see that you've made the last line into (3), whereas it just talks about the above two points? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 To Sonam's first point: I think those comments are closely related to the logic within the method since they discuss impl details like exception types. |
||
void send(DatastreamTask task, ProducerRecord<K, V> producerRecord, Callback onComplete) | ||
throws InterruptedException { | ||
// There are two known cases that lead to IllegalStateException and we should retry: | ||
// 1) number of brokers is less than minISR | ||
// 2) producer is closed in generateSendFailure by another thread | ||
// For either condition, we should retry as broker comes back healthy or producer is recreated | ||
boolean retry = true; | ||
int numberOfAttempt = 0; | ||
int numberOfAttempts = 0; | ||
|
||
while (retry) { | ||
try { | ||
++numberOfAttempt; | ||
maybeGetKafkaProducer(task).ifPresent(p -> p.send(producerRecord, (metadata, exception) -> { | ||
if (exception == null) { | ||
onComplete.onCompletion(metadata, null); | ||
} else { | ||
onComplete.onCompletion(metadata, generateSendFailure(exception)); | ||
} | ||
})); | ||
numberOfAttempts++; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. change this back to ++numberOfAttempts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 I like the rename (added In fact, it wouldn't be such a bad idea to turn that loop into a for loop: for (int numberOfAttempts = 1; retry; ++numberOfAttempts) {
...
} or even eliminate for (int numberOfAttempts = 1;; ++numberOfAttempts) {
try {
maybeGetKafkaProducer(task).ifPresent(p -> doSend(p, producerRecord, onComplete));
return;
} catch (...) {
...
}
} |
||
|
||
maybeGetKafkaProducer(task).ifPresent(p -> doSend(p, producerRecord, onComplete)); | ||
|
||
retry = false; | ||
} catch (IllegalStateException e) { | ||
//The following exception should be quite rare as most exceptions will be throw async callback | ||
_log.warn("Either send is called on a closed producer or broker count is less than minISR, retry in {} ms.", | ||
_sendFailureRetryWaitTimeMs, e); | ||
Thread.sleep(_sendFailureRetryWaitTimeMs); | ||
} catch (TimeoutException e) { | ||
_log.warn("Kafka producer buffer is full, retry in {} ms.", _sendFailureRetryWaitTimeMs, e); | ||
} catch (TimeoutException ex) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was there some reason why you had to reorder the catches here? In the previous code, we catch IllegalStateException, and then catch TimeoutException, and then catch KafkaException. Let's not reorder these unless there is a good reason to. It also becomes harder to review since I can't easily see the actual changes vs. reordering. Also you renamed all exception 'e' to 'ex', is that necessary? |
||
_log.warn("Kafka producer buffer is full, retry in {} ms.", _sendFailureRetryWaitTimeMs, ex); | ||
Thread.sleep(_sendFailureRetryWaitTimeMs); | ||
} catch (KafkaException e) { | ||
Throwable cause = e.getCause(); | ||
while (cause instanceof KafkaException) { | ||
cause = cause.getCause(); | ||
} | ||
// Set a max_send_attempts for KafkaException as it may be non-recoverable | ||
if (numberOfAttempt > MAX_SEND_ATTEMPTS || ((cause instanceof Error || cause instanceof RuntimeException))) { | ||
_log.error("Send failed for partition {} with a non retriable exception", producerRecord.partition(), e); | ||
throw generateSendFailure(e); | ||
} catch (IllegalStateException ex) { | ||
// The following exception should be quite rare as most exceptions will be throw async callback | ||
_log.warn("Either send is called on a closed producer or broker count is less than minISR, retry in {} ms.", | ||
_sendFailureRetryWaitTimeMs, ex); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like the earlier IllegalStateException block would sleep and you've removed it. why? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. This is definitely a bug I introduced. Will revert the entire exception handling piece in |
||
} catch (KafkaException ex) { | ||
Throwable rootCause = ExceptionUtils.getRootCause(ex); | ||
if (numberOfAttempts > MAX_SEND_ATTEMPTS || | ||
(rootCause instanceof Error || rootCause instanceof RuntimeException)) { | ||
// Set a max_send_attempts for KafkaException as it may be non-recoverable | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment doesn't make sense here, move it back to outside the if condition? |
||
_log.error("Send failed for partition {} with a non retriable exception", producerRecord.partition(), ex); | ||
throw generateSendFailure(ex); | ||
} else { | ||
// The exception might be recoverable. Retry will be attempted | ||
_log.warn("Send failed for partition {} with retriable exception, retry {} out of {} in {} ms.", | ||
producerRecord.partition(), numberOfAttempt, MAX_SEND_ATTEMPTS, _sendFailureRetryWaitTimeMs, e); | ||
producerRecord.partition(), numberOfAttempts, MAX_SEND_ATTEMPTS, _sendFailureRetryWaitTimeMs, ex); | ||
Thread.sleep(_sendFailureRetryWaitTimeMs); | ||
} | ||
} catch (Exception e) { | ||
_log.error("Send failed for partition {} with an exception", producerRecord.partition(), e); | ||
throw generateSendFailure(e); | ||
} catch (Exception ex) { | ||
_log.error("Send failed for partition {} with an exception", producerRecord.partition(), ex); | ||
throw generateSendFailure(ex); | ||
} | ||
} | ||
} | ||
|
||
private synchronized void shutdownProducer() { | ||
void doSend(Producer<K, V> producer, ProducerRecord<K, V> record, Callback callback) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this intended to be package private? If so, add a comment, otherwise add the appropriate public/protected/private There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @somandal There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, makes sense. Just keep things consistent, i.e. if you need a more visible scope for multiple methods, declare them all as package private, and the rest as private. Don't mix protected and package private without having a very good reason to. |
||
producer.send(record, (metadata, exception) -> { | ||
if (exception == null) { | ||
callback.onCompletion(metadata, null); | ||
} else { | ||
callback.onCompletion(metadata, generateSendFailure(exception)); | ||
} | ||
}); | ||
} | ||
|
||
protected synchronized void shutdownProducer() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be made package private (no modifier) instead. The same applies to |
||
Producer<K, V> producer = _kafkaProducer; | ||
// Nullify first to prevent subsequent send() to use | ||
// the current producer which is being shutdown. | ||
|
@@ -246,7 +253,7 @@ private synchronized void shutdownProducer() { | |
} | ||
} | ||
|
||
private DatastreamRuntimeException generateSendFailure(Exception exception) { | ||
protected DatastreamRuntimeException generateSendFailure(Exception exception) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see you overriding this in your Bounded implementation, why make this protected? |
||
_dynamicMetricsManager.createOrUpdateMeter(_metricsNamesPrefix, AGGREGATE, PRODUCER_ERROR, 1); | ||
if (exception instanceof IllegalStateException) { | ||
_log.warn("sent failure transiently, exception: ", exception); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/** | ||
* Copyright 2020 LinkedIn Corporation. All rights reserved. | ||
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. | ||
* See the NOTICE file in the project root for additional information regarding copyright ownership. | ||
*/ | ||
package com.linkedin.datastream.common; | ||
|
||
import java.time.Duration; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.function.Function; | ||
|
||
import org.apache.commons.lang.NullArgumentException; | ||
|
||
import com.google.common.util.concurrent.ThreadFactoryBuilder; | ||
|
||
/** | ||
* Utilities for working with CompletableFutures | ||
*/ | ||
public class CompletableFutureUtils { | ||
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1, | ||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("failAfter-%d").build()); | ||
|
||
/** | ||
* Returns a CompletableFuture which fails with a TimeoutException after the given interval | ||
* @param duration Duration after which to fail | ||
*/ | ||
public static <T> CompletableFuture<T> failAfter(Duration duration) { | ||
final CompletableFuture<T> promise = new CompletableFuture<>(); | ||
SCHEDULER.schedule(() -> { | ||
TimeoutException ex = new TimeoutException(String.format("Timeout after {}ms", duration)); | ||
return promise.completeExceptionally(ex); | ||
}, duration.toMillis(), TimeUnit.MILLISECONDS); | ||
return promise; | ||
} | ||
|
||
/** | ||
* Returns a {@link CompletableFuture} which either successfully executes the given future, or fails with timeout | ||
* after the given duration | ||
* @param future Future to execute | ||
* @param duration Timeout duration | ||
* @throws NullArgumentException | ||
*/ | ||
public static <T> CompletableFuture<T> within(CompletableFuture<T> future, Duration duration) throws | ||
NullArgumentException { | ||
if (future == null) { | ||
throw new NullArgumentException("future"); | ||
} | ||
|
||
CompletableFuture<T> timeout = failAfter(duration); | ||
return future.applyToEither(timeout, Function.identity()); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wouldn't be the right way to pass configs to your KafkaProducerWrapper. Lets discuss how to do this offline