Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update kafka consumer #14

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>

<!-- test dependencies -->
Expand Down Expand Up @@ -205,7 +205,6 @@
<include>org.scala-lang:*</include>
<include>com.yammer.metrics:*</include>
<include>org.xerial.snappy:*</include>
<include>net.sf.jopt-simple:*</include>
<include>com.101tec:*</include>
<include>log4j:*</include>
</includes>
Expand Down
42 changes: 9 additions & 33 deletions src/main/java/net/opentsdb/tsd/KafkaRpcPluginConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
// limitations under the License.
package net.opentsdb.tsd;

import java.util.Map;

import net.opentsdb.utils.Config;

import java.util.Map;

/**
* The configuration class for the Kafka Consumer
*/
Expand All @@ -37,43 +37,29 @@ public class KafkaRpcPluginConfig extends Config {
// KAFKA
public static final String AUTO_COMMIT_INTERVAL_MS =
"auto.commit.interval.ms";
public static final String AUTO_COMMIT_ENABLE = "auto.commit.enable";
public static final String AUTO_COMMIT_ENABLE = "enable.auto.commit";
public static final String AUTO_OFFSET_RESET = "auto.offset.reset";
public static final String REBALANCE_BACKOFF_MS = "rebalance.backoff.ms";
public static final String REBALANCE_RETRIES = "rebalance.retries.max";
public static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS =
"zookeeper.connection.timeout.ms";
public static final String ZOOKEEPER_SESSION_TIMEOUT_MS =
"zookeeper.session.timeout.ms";
public static final String REBALANCE_BACKOFF_MS = "rebalance.timeout.ms";

// KAFKA DEFAULTS
public static final int AUTO_COMMIT_INTERVAL_DEFAULT = 5000;
public static final String AUTO_COMMIT_ENABLE_DEFAULT = "true";
public static final String AUTO_OFFSET_RESET_DEFAULT = "smallest";
public static final String AUTO_COMMIT_ENABLE_DEFAULT = "false";
public static final String AUTO_OFFSET_RESET_DEFAULT = "latest";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep the changeset as small as possible and avoid surprises for users, I'd lean to keeping the old defaults in place unless there is a particular reason why have these two defaults changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto-commit is off-by-default because this code does manual committing after we insert a batch of records.

For offset_reset, kafka changed the value names. "latest" is the new name for "smallest".

public static final int REBALANCE_BACKOFF_MS_DEFAULT = 60000;
public static final int REBALANCE_RETRIES_DEFAULT = 20;
public static final int ZK_CONNECTION_TIMEOUT_DEFAULT = 60000;
public static final int ZK_SESSION_TIMEOUT_DEFAULT = 60000;

// Producer defaults for the requeue
public static final String REQUEUE_CONFIG_PREFIX = PLUGIN_PROPERTY_BASE + "seh";
public static final String KAFKA_TOPIC_PREFIX = REQUEUE_CONFIG_PREFIX + ".topic";

// Kafka pass through values
public static final String KAFKA_BROKERS = KAFKA_CONFIG_PREFIX +
"metadata.broker.list";
"bootstrap.servers";
public static final String REQUIRED_ACKS = KAFKA_CONFIG_PREFIX +
"request.required.acks";
"acks";
public static final String REQUEST_TIMEOUT = KAFKA_CONFIG_PREFIX +
"request.timeout.ms";
public static final String MAX_RETRIES = KAFKA_CONFIG_PREFIX +
"send.max_retries";
public static final String PARTITIONER_CLASS = KAFKA_CONFIG_PREFIX +
"partitioner.class";
public static final String PRODUCER_TYPE = KAFKA_CONFIG_PREFIX +
"producer.type";
public static final String KEY_SERIALIZER = KAFKA_CONFIG_PREFIX +
"key.serializer.class";
"retries";

/**
* Default ctor
Expand Down Expand Up @@ -104,22 +90,12 @@ protected void setLocalDefaults() {
AUTO_OFFSET_RESET_DEFAULT);
default_map.put(KAFKA_CONFIG_PREFIX + REBALANCE_BACKOFF_MS,
Integer.toString(REBALANCE_BACKOFF_MS_DEFAULT));
default_map.put(KAFKA_CONFIG_PREFIX + REBALANCE_RETRIES,
Integer.toString(REBALANCE_RETRIES_DEFAULT));
default_map.put(KAFKA_CONFIG_PREFIX + ZOOKEEPER_SESSION_TIMEOUT_MS,
Integer.toString(ZK_SESSION_TIMEOUT_DEFAULT));
default_map.put(KAFKA_CONFIG_PREFIX + ZOOKEEPER_CONNECTION_TIMEOUT_MS,
Integer.toString(ZK_CONNECTION_TIMEOUT_DEFAULT));
default_map.put(METRIC_AGG_FREQUENCY,
Integer.toString(DEFAULT_METRIC_AGG_FREQUENCY));

default_map.put(REQUIRED_ACKS, "0");
default_map.put(REQUEST_TIMEOUT, "10000");
default_map.put(MAX_RETRIES, "1000");
default_map.put(PRODUCER_TYPE, "async");
default_map.put(KEY_SERIALIZER, "kafka.serializer.StringEncoder");
default_map.put(PARTITIONER_CLASS,
"net.opentsdb.tsd.KafkaSimplePartitioner");

for (Map.Entry<String, String> entry : default_map.entrySet()) {
if (!hasProperty(entry.getKey())) {
Expand Down
18 changes: 9 additions & 9 deletions src/main/java/net/opentsdb/tsd/KafkaRpcPluginGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
package net.opentsdb.tsd;

import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,7 +29,6 @@

import com.google.common.util.concurrent.RateLimiter;

import joptsimple.internal.Strings;
import net.opentsdb.data.deserializers.Deserializer;
import net.opentsdb.utils.PluginLoader;

Expand Down Expand Up @@ -133,7 +129,7 @@ public KafkaRpcPluginGroup(final KafkaRpcPlugin parent, final String groupID) {

final String deser_class = config.getString(
KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + groupID + ".deserializer");
if (Strings.isNullOrEmpty(deser_class)) {
if (deser_class == null || deser_class.isEmpty()) {
throw new IllegalArgumentException("Deserializer class cannot be null or empty.");
}

Expand Down Expand Up @@ -177,7 +173,7 @@ public KafkaRpcPluginGroup(final KafkaRpcPlugin parent, final String groupID) {
}

for (int i = 0; i < num_threads; i++) {
kafka_consumers.add(new KafkaRpcPluginThread(this, i, topics));
kafka_consumers.add(new KafkaRpcPluginThread(this, i, Arrays.asList(topics.split(","))));
}

timer.newTimeout(this, config.threadCheckInterval(), TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -213,7 +209,11 @@ public void start() {
/** Gracefully shuts down all of the consumer threads */
public void shutdown() {
for (final KafkaRpcPluginThread consumer : kafka_consumers) {
consumer.shutdown();
try {
consumer.close();
} catch (InterruptedException e) {
LOG.error("Unexpected interruptedexception while closing consumer "+consumer);
}
}
}

Expand Down Expand Up @@ -363,7 +363,7 @@ public void setRate(final double rate) {
LOG.info("The rate has been set to zero for " + this + ". Killing threads.");
for (final KafkaRpcPluginThread writer : kafka_consumers) {
try {
writer.shutdown();
writer.close();
} catch (Exception e) {
LOG.error("Exception shutting down thread " + writer, e);
}
Expand Down
Loading