entry : default_map.entrySet()) {
if (!hasProperty(entry.getKey())) {
diff --git a/src/main/java/net/opentsdb/tsd/KafkaRpcPluginGroup.java b/src/main/java/net/opentsdb/tsd/KafkaRpcPluginGroup.java
index a110e9d..2c93d2d 100644
--- a/src/main/java/net/opentsdb/tsd/KafkaRpcPluginGroup.java
+++ b/src/main/java/net/opentsdb/tsd/KafkaRpcPluginGroup.java
@@ -15,10 +15,7 @@
package net.opentsdb.tsd;
import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -32,7 +29,6 @@
import com.google.common.util.concurrent.RateLimiter;
-import joptsimple.internal.Strings;
import net.opentsdb.data.deserializers.Deserializer;
import net.opentsdb.utils.PluginLoader;
@@ -133,7 +129,7 @@ public KafkaRpcPluginGroup(final KafkaRpcPlugin parent, final String groupID) {
final String deser_class = config.getString(
KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + groupID + ".deserializer");
- if (Strings.isNullOrEmpty(deser_class)) {
+ if (deser_class == null || deser_class.isEmpty()) {
throw new IllegalArgumentException("Deserializer class cannot be null or empty.");
}
@@ -177,7 +173,7 @@ public KafkaRpcPluginGroup(final KafkaRpcPlugin parent, final String groupID) {
}
for (int i = 0; i < num_threads; i++) {
- kafka_consumers.add(new KafkaRpcPluginThread(this, i, topics));
+ kafka_consumers.add(new KafkaRpcPluginThread(this, i, Arrays.asList(topics.split(","))));
}
timer.newTimeout(this, config.threadCheckInterval(), TimeUnit.MILLISECONDS);
@@ -213,7 +209,11 @@ public void start() {
/** Gracefully shuts down all of the consumer threads */
public void shutdown() {
for (final KafkaRpcPluginThread consumer : kafka_consumers) {
- consumer.shutdown();
+ try {
+ consumer.close();
+ } catch (InterruptedException e) {
+ LOG.error("Unexpected interruptedexception while closing consumer "+consumer);
+ }
}
}
@@ -363,7 +363,7 @@ public void setRate(final double rate) {
LOG.info("The rate has been set to zero for " + this + ". Killing threads.");
for (final KafkaRpcPluginThread writer : kafka_consumers) {
try {
- writer.shutdown();
+ writer.close();
} catch (Exception e) {
LOG.error("Exception shutting down thread " + writer, e);
}
diff --git a/src/main/java/net/opentsdb/tsd/KafkaRpcPluginThread.java b/src/main/java/net/opentsdb/tsd/KafkaRpcPluginThread.java
index f83eea1..fb5590b 100644
--- a/src/main/java/net/opentsdb/tsd/KafkaRpcPluginThread.java
+++ b/src/main/java/net/opentsdb/tsd/KafkaRpcPluginThread.java
@@ -17,34 +17,25 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AtomicDouble;
import com.google.common.util.concurrent.RateLimiter;
-
-import joptsimple.internal.Strings;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import kafka.common.ConsumerRebalanceFailedException;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.consumer.TopicFilter;
-import kafka.consumer.Whitelist;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
import net.opentsdb.core.IncomingDataPoint;
import net.opentsdb.core.TSDB;
import net.opentsdb.data.TypedIncomingData;
import net.opentsdb.data.deserializers.Deserializer;
import net.opentsdb.tsd.KafkaRpcPluginGroup.TsdbConsumerType;
-
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* New and Improved Kafka to TSDB Writer! Now with TSDB Integration!
*
@@ -54,9 +45,8 @@
* the value is logged and bit-bucketed.
*/
public class KafkaRpcPluginThread extends Thread {
- static final String CONSUMER_ID = "consumer.id";
+ static final String CONSUMER_ID = "client.id";
static final String GROUP_ID = "group.id";
- static final String AGG_TAG = "_aggregate";
static final String DEFAULT_COUNTER_ID = "null";
private static final Logger LOG = LoggerFactory.getLogger(
KafkaRpcPluginThread.class);
@@ -119,12 +109,13 @@ public String toString() {
private final int thread_id;
private final KafkaRpcPluginGroup group;
private final String consumer_id;
- private final TopicFilter topic_filter;
+ private final List topics;
private final int number_consumer_streams = 1;
private final RateLimiter rate_limiter;
private final TsdbConsumerType consumer_type;
private final long requeue_delay;
private final AtomicBoolean thread_running = new AtomicBoolean();
+ private final AtomicBoolean shouldClose = new AtomicBoolean();
private final AtomicLong messagesReceived = new AtomicLong();
private final AtomicLong datapointsReceived = new AtomicLong();
@@ -132,8 +123,8 @@ public String toString() {
private final AtomicDouble cumulativeRateDelay = new AtomicDouble();
private final AtomicDouble kafkaWaitTime = new AtomicDouble();
private final Deserializer deserializer;
-
- private ConsumerConnector consumer;
+
+ private KafkaConsumer consumer;
/**
* Default ctor
@@ -142,7 +133,7 @@ public String toString() {
* @param topics The topic list to subscribe to
*/
public KafkaRpcPluginThread(final KafkaRpcPluginGroup group,
- final int threadID, final String topics) {
+ final int threadID, final List topics) {
if (topics == null || topics.isEmpty()) {
throw new IllegalArgumentException("Missing topics");
}
@@ -173,7 +164,7 @@ public KafkaRpcPluginThread(final KafkaRpcPluginGroup group,
this.consumer_type = group.getConsumerType();
thread_running.set(false);
- topic_filter = new Whitelist(topics);
+ this.topics = new ArrayList(topics);
consumer_id = threadID + "_" + group.getParent().getHost();
if (consumer_type == TsdbConsumerType.REQUEUE_RAW) {
if (group.getParent().getConfig().hasProperty(
@@ -200,10 +191,12 @@ public String toString() {
* @return The consumer connector
* @throws IllegalArgumentException if the config is invalid
*/
- ConsumerConnector buildConsumerConnector() {
+ KafkaConsumer buildConsumerConnector() {
final Properties properties = buildConsumerProperties();
- return kafka.consumer.Consumer
- .createJavaConsumerConnector(new ConsumerConfig(properties));
+ final KafkaConsumer consumer = new KafkaConsumer(properties);
+ consumer.subscribe(topics);
+ LOG.info("Initializing consumer. consumer id: " + consumer_id + ", topics: " + topics.toString() + ", props: " + properties);
+ return consumer;
}
/**
@@ -240,8 +233,8 @@ Properties buildConsumerProperties() {
// settings from other parts of our code base
properties.put(GROUP_ID, group.getGroupID());
properties.put(CONSUMER_ID, consumer_id);
- LOG.info("Initializing consumer config with consumer id " + consumer_id +
- " and props " + properties);
+ properties.put("key.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer");
+ properties.put("value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer");
return properties;
}
@@ -249,105 +242,101 @@ Properties buildConsumerProperties() {
public void run() {
thread_running.set(true);
Thread.currentThread().setName(group.getGroupID() + "_" + consumer_id);
-
+
try {
consumer = buildConsumerConnector();
- // We're only fetching one stream at a time per thread so we only care
- // about the first one.
- final KafkaStream stream =
- consumer.createMessageStreamsByFilter(topic_filter,
- number_consumer_streams).get(0);
- final ConsumerIterator it = stream.iterator();
int errorCount = 0;
long nanoCtr = System.nanoTime();
- while (it.hasNext()) {
- try {
- // The following line will block until a message has been read from
- // the stream.
- final MessageAndMetadata message = it.next();
- long delay = System.nanoTime() - nanoCtr;
- if (nanoCtr > 0) {
- kafkaWaitTime.addAndGet(((double) delay / (double)1000000));
- }
- messagesReceived.incrementAndGet();
-
- // Now that we have received the message, we should record the present
- // system time.
- final long recvTime = System.currentTimeMillis();
-
- switch (consumer_type) {
- case RAW:
- case ROLLUP:
- // Deserialize the event from the received (opaque) message.
- final List eventList =
- deserializer.deserialize(this, message.message());
- if (eventList == null) {
- deserializationErrors.incrementAndGet();
- continue;
- }
-
- // I ♡ Google! It's so easy! No release necessary! Thread Safe!
- final double waiting = rate_limiter.acquire();
- cumulativeRateDelay.addAndGet(waiting);
- datapointsReceived.addAndGet(eventList.size());
- for (TypedIncomingData ev : eventList) {
- ev.processData(this, recvTime);
- }
- break;
- case REQUEUE_RAW:
- case REQUEUE_ROLLUP:
- case UID_ABUSE:
- final List requeuedList =
- deserializer.deserialize(this, message.message());
- if (requeuedList == null) {
- deserializationErrors.incrementAndGet();
- continue;
+ while (!shouldClose.get()) {
+ ConsumerRecords records = consumer.poll(Long.MAX_VALUE);
+ if(records == null) {
+ break;
+ }
+
+ for (ConsumerRecord record : records) {
+ try {
+ long delay = System.nanoTime() - nanoCtr;
+ if (nanoCtr > 0) {
+ kafkaWaitTime.addAndGet(((double) delay / (double) 1000000));
}
-
- // to avoid tight requeue loops we want to sleep a spell
- // if we receive a data point that was recently added
- // to the queue
- datapointsReceived.addAndGet(requeuedList.size());
- for(TypedIncomingData ev : requeuedList) {
- final long requeueDiff = System.currentTimeMillis() - ev.getRequeueTS();
- if (requeueDiff < requeue_delay) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sleeping for " + (requeue_delay - requeueDiff)
- + " ms due to requeue delay");
+ messagesReceived.incrementAndGet();
+
+ // Now that we have received the message, we should record the present
+ // system time.
+ final long recvTime = System.currentTimeMillis();
+
+ switch (consumer_type) {
+ case RAW:
+ case ROLLUP:
+ // Deserialize the event from the received (opaque) message.
+ final List eventList =
+ deserializer.deserialize(this, record.value().get());
+ if (eventList == null) {
+ deserializationErrors.incrementAndGet();
+ continue;
}
- //incrementCounter(CounterType.RequeuesDelayed, ns);
- Thread.sleep(requeue_delay - requeueDiff);
- }
- // I ♡ Google! It's so easy! No release necessary! Thread Safe!
- final double w = rate_limiter.acquire();
- cumulativeRateDelay.addAndGet(w);
- ev.processData(this, recvTime);
- }
- break;
- default:
- throw new IllegalStateException("Unknown consumer type: " +
+ // I ♡ Google! It's so easy! No release necessary! Thread Safe!
+ final double waiting = rate_limiter.acquire();
+ cumulativeRateDelay.addAndGet(waiting);
+ datapointsReceived.addAndGet(eventList.size());
+ for (TypedIncomingData ev : eventList) {
+ ev.processData(this, recvTime);
+ }
+ break;
+ case REQUEUE_RAW:
+ case REQUEUE_ROLLUP:
+ case UID_ABUSE:
+ final List requeuedList =
+ deserializer.deserialize(this, record.value().get());
+ if (requeuedList == null) {
+ deserializationErrors.incrementAndGet();
+ continue;
+ }
+
+ // to avoid tight requeue loops we want to sleep a spell
+ // if we receive a data point that was recently added
+ // to the queue
+ datapointsReceived.addAndGet(requeuedList.size());
+ for (TypedIncomingData ev : requeuedList) {
+ final long requeueDiff = System.currentTimeMillis() - ev.getRequeueTS();
+ if (requeueDiff < requeue_delay) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sleeping for " + (requeue_delay - requeueDiff)
+ + " ms due to requeue delay");
+ }
+ //incrementCounter(CounterType.RequeuesDelayed, ns);
+ Thread.sleep(requeue_delay - requeueDiff);
+ }
+
+ // I ♡ Google! It's so easy! No release necessary! Thread Safe!
+ final double w = rate_limiter.acquire();
+ cumulativeRateDelay.addAndGet(w);
+ ev.processData(this, recvTime);
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unknown consumer type: " +
consumer_type + " for " + this);
+ }
+ errorCount = 0;
+ } catch (RuntimeException e) {
+ LOG.error("Exception in kafkaReader or Tsdb Writer ", e);
+ incrementNamespaceCounter(CounterType.Exception, null);
+ errorCount++;
+ if (errorCount >= 5) {
+ LOG.error("Too many errors, Killing the consumer thread " + this);
+ throw e;
+ }
}
- errorCount = 0;
- } catch (RuntimeException e) {
- LOG.error("Exception in kafkaReader or Tsdb Writer ", e);
- incrementNamespaceCounter(CounterType.Exception, null);
- errorCount++;
- if (errorCount >= 5) {
- LOG.error("Too many errors, Killing the consumer thread " + this);
- throw e;
- }
+ nanoCtr = System.nanoTime();
}
- nanoCtr = System.nanoTime();
+ consumer.commitSync();
}
-
- LOG.warn("Consumer thread [" + this + "] has run out of messages");
- } catch (ConsumerRebalanceFailedException crfex) {
- LOG.error("Failed to read Kafka partition from the consumer "
- + "thread " + this, crfex);
- group.incrementRebalanceFailures();
+ } catch(WakeupException e) {
+ if(!shouldClose.get())
+ LOG.error("Kafka thread "+this+" got wakeup event without shouldClose set; this is a bug");
} catch (Exception e) {
LOG.error("Unexpected exception in Kafka thread: " + this, e);
//incrementCounter(CounterType.DroppedException, null);
@@ -364,17 +353,19 @@ public void run() {
}
thread_running.set(false);
}
-
+
/**
- * Attempts to gracefully shutdown the thread, closing the consumer and waiting
- * for the inflight RPCs to complete.
+ * Closes the Kafka connection.
+ *
+ * Not thread safe; should only be called at the end of run
*/
- public void shutdown() {
+ @VisibleForTesting
+ void shutdown() {
LOG.info("Shutting down thread [" + this + "]");
try {
if (consumer != null) {
- consumer.shutdown();
- consumer = null;
+ consumer.close();
+ // Note: can't set consumer = null here, as it may race with `close` calling `wakeup`.
LOG.info("Shutdown the kafka consumer on thread [" + this + "]");
}
} catch (Exception e) {
@@ -391,6 +382,21 @@ public void shutdown() {
System.exit(1);
}
}
+
+ /**
+ * Tells the kafka thread to gracefully shut down, blocking until it does.
+ */
+ public void close() throws InterruptedException {
+ this.close(true);
+ }
+
+ public void close(boolean block) throws InterruptedException {
+ shouldClose.set(true);
+ if(consumer != null)
+ consumer.wakeup();
+ if(block)
+ this.join();
+ }
/**
* Increments a counter for the namespace in the map.
@@ -475,7 +481,7 @@ public TSDB getTSDB() {
}
@VisibleForTesting
- ConsumerConnector consumer() {
+ KafkaConsumer consumer() {
return consumer;
}
@@ -506,7 +512,7 @@ String getDebugString(final IncomingDataPoint dp) {
}
String getPrefix(final String metric) {
- if (Strings.isNullOrEmpty(metric)) {
+ if (metric == null || "".equals(metric)) {
return DEFAULT_COUNTER_ID;
}
diff --git a/src/main/java/net/opentsdb/tsd/KafkaSimplePartitioner.java b/src/main/java/net/opentsdb/tsd/KafkaSimplePartitioner.java
deleted file mode 100644
index 2f118ed..0000000
--- a/src/main/java/net/opentsdb/tsd/KafkaSimplePartitioner.java
+++ /dev/null
@@ -1,39 +0,0 @@
-// This file is part of OpenTSDB.
-// Copyright (C) 2018 The OpenTSDB Authors.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package net.opentsdb.tsd;
-
-import kafka.utils.VerifiableProperties;
-
-/**
- * Simple partitioner class. It assumes that our key is a positive integer
- * based on the hash code. If you pass in a string that can't parse to a Long,
- * then the partitioner will throw an exception.
- */
-public class KafkaSimplePartitioner implements kafka.producer.Partitioner {
-
- /**
- * Ctor required for Kafka
- * @param vb Properties to parse if necessary
- */
- public KafkaSimplePartitioner(final VerifiableProperties vb) {
-
- }
-
- @Override
- public int partition(final Object key, final int num_partitions) {
- return (int)(Long.parseLong((String) key) % num_partitions);
- }
-
-}
diff --git a/src/main/java/net/opentsdb/tsd/KafkaStorageExceptionHandler.java b/src/main/java/net/opentsdb/tsd/KafkaStorageExceptionHandler.java
index 7e8fb8a..317b116 100644
--- a/src/main/java/net/opentsdb/tsd/KafkaStorageExceptionHandler.java
+++ b/src/main/java/net/opentsdb/tsd/KafkaStorageExceptionHandler.java
@@ -20,6 +20,9 @@
import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,9 +30,6 @@
import com.google.common.base.Strings;
import com.stumbleupon.async.Deferred;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
import net.opentsdb.core.IncomingDataPoint;
import net.opentsdb.core.TSDB;
import net.opentsdb.data.Aggregate;
@@ -66,10 +66,10 @@ public enum KafkaRequeueTopic {
private KafkaRpcPluginConfig config;
/** A Kafka producer configuration object */
- private ProducerConfig producer_config;
+ private Properties producer_config;
/** A Kafka producer */
- private Producer producer;
+ private KafkaProducer producer;
/**
* Default ctor
@@ -139,8 +139,11 @@ private void setKafkaConfig() {
entry.getValue());
}
}
+
+ properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.BytesSerializer");
- producer_config = new ProducerConfig(properties);
+ producer_config = properties;
}
@Override
@@ -149,7 +152,7 @@ public void initialize(TSDB tsdb) {
config = new KafkaRpcPluginConfig(tsdb.getConfig());
setKafkaConfig();
- producer = new Producer(producer_config);
+ producer = new KafkaProducer(producer_config);
LOG.info("Initialized kafka requeue publisher.");
}
@@ -204,10 +207,11 @@ public void handleError(final IncomingDataPoint dp,
}
int hash = dp.getMetric().hashCode() + Objects.hashCode(dp.getTags());
- final KeyedMessage data =
- new KeyedMessage(topic,
- Integer.toString(Math.abs(hash)),
- JSON.serializeToBytes(dp));
+ final ProducerRecord data = new ProducerRecord(
+ topic,
+ Integer.toString(Math.abs(hash)),
+ new Bytes(JSON.serializeToBytes(dp))
+ );
producer.send(data);
final AtomicLong requeued = topic_requeued_counters.get(type);
diff --git a/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginConfig.java b/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginConfig.java
index 9e3a55a..331b351 100644
--- a/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginConfig.java
+++ b/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginConfig.java
@@ -15,14 +15,13 @@
package net.opentsdb.tsd;
import net.opentsdb.utils.Config;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.modules.junit4.PowerMockRunner;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
@RunWith(PowerMockRunner.class)
public class TestKafkaRpcPluginConfig {
@@ -34,7 +33,7 @@ public void defaults() throws Exception {
assertEquals(KafkaRpcPluginConfig.AUTO_COMMIT_INTERVAL_DEFAULT,
config.getInt(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX +
KafkaRpcPluginConfig.AUTO_COMMIT_INTERVAL_MS));
- assertTrue(config.getBoolean(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX +
+ assertFalse(config.getBoolean(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX +
KafkaRpcPluginConfig.AUTO_COMMIT_ENABLE));
assertEquals(KafkaRpcPluginConfig.AUTO_OFFSET_RESET_DEFAULT,
config.getString(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX +
@@ -42,23 +41,9 @@ public void defaults() throws Exception {
assertEquals(KafkaRpcPluginConfig.REBALANCE_BACKOFF_MS_DEFAULT,
config.getInt(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX +
KafkaRpcPluginConfig.REBALANCE_BACKOFF_MS));
- assertEquals(KafkaRpcPluginConfig.REBALANCE_RETRIES_DEFAULT,
- config.getInt(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX +
- KafkaRpcPluginConfig.REBALANCE_RETRIES));
- assertEquals(KafkaRpcPluginConfig.ZK_SESSION_TIMEOUT_DEFAULT,
- config.getInt(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX +
- KafkaRpcPluginConfig.ZOOKEEPER_SESSION_TIMEOUT_MS));
- assertEquals(KafkaRpcPluginConfig.ZK_CONNECTION_TIMEOUT_DEFAULT,
- config.getInt(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX +
- KafkaRpcPluginConfig.ZOOKEEPER_CONNECTION_TIMEOUT_MS));
assertEquals(0, config.getInt(KafkaRpcPluginConfig.REQUIRED_ACKS));
assertEquals(10000, config.getInt(KafkaRpcPluginConfig.REQUEST_TIMEOUT));
assertEquals(1000, config.getInt(KafkaRpcPluginConfig.MAX_RETRIES));
- assertEquals("async", config.getString(KafkaRpcPluginConfig.PRODUCER_TYPE));
- assertEquals("kafka.serializer.StringEncoder",
- config.getString(KafkaRpcPluginConfig.KEY_SERIALIZER));
- assertEquals("net.opentsdb.tsd.KafkaSimplePartitioner",
- config.getString(KafkaRpcPluginConfig.PARTITIONER_CLASS));
}
}
diff --git a/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginGroup.java b/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginGroup.java
index 2c13ecd..b602a13 100644
--- a/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginGroup.java
+++ b/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginGroup.java
@@ -343,8 +343,8 @@ public void setRate() throws Exception {
group.setRate(42);
assertEquals(42, group.getRate(), 0.000);
assertEquals(42, group.getRateLimiter().getRate(), 0.000);
- verify(threads.get(0), never()).shutdown();
- verify(threads.get(1), never()).shutdown();
+ verify(threads.get(0), never()).close();
+ verify(threads.get(1), never()).close();
}
@Test
@@ -355,8 +355,8 @@ public void setRateZero() throws Exception {
// remains the default as we can't set the limiter to 0
assertEquals(KafkaRpcPluginConfig.DEFAULT_CONSUMER_RATE,
group.getRateLimiter().getRate(), 0.000);
- verify(threads.get(0), times(1)).shutdown();
- verify(threads.get(1), times(1)).shutdown();
+ verify(threads.get(0), times(1)).close();
+ verify(threads.get(1), times(1)).close();
}
@Test
diff --git a/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginThread.java b/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginThread.java
index d276843..dc5b791 100644
--- a/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginThread.java
+++ b/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginThread.java
@@ -14,39 +14,9 @@
// limitations under the License.
package net.opentsdb.tsd;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyMap;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.consumer.TopicFilter;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.message.MessageAndMetadata;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.RateLimiter;
+import com.stumbleupon.async.Deferred;
import net.opentsdb.core.HistogramCodecManager;
import net.opentsdb.core.IncomingDataPoint;
import net.opentsdb.core.TSDB;
@@ -59,7 +29,11 @@
import net.opentsdb.tsd.KafkaRpcPluginGroup.TsdbConsumerType;
import net.opentsdb.utils.Config;
import net.opentsdb.utils.JSON;
-
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.utils.Bytes;
import org.hbase.async.HBaseException;
import org.hbase.async.PleaseThrottleException;
import org.junit.Before;
@@ -71,22 +45,34 @@
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.util.concurrent.RateLimiter;
-import com.stumbleupon.async.Deferred;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+import static org.powermock.api.mockito.PowerMockito.mock;
@PowerMockIgnore({ "javax.management.*" })
@RunWith(PowerMockRunner.class)
@PrepareForTest({ TSDB.class, KafkaRpcPluginThread.class, Config.class,
Thread.class,
- Consumer.class, ConsumerConfig.class, PleaseThrottleException.class})
+ KafkaConsumer.class, ConsumerConfig.class, PleaseThrottleException.class})
public class TestKafkaRpcPluginThread {
private static final String METRIC = "sys.cpu.user";
private static final String METRIC2 = "sys.cpu.iowait";
private static final String PREFIX = "sys";
private static final long TS = 1492641000L;
private static final String LOCALHOST = "localhost";
- private static final String TOPICS = "TSDB_600_1,TSDB_600_2,TSDB_600_3";
+ private static final List TOPICS = Arrays.asList("TSDB_600_1","TSDB_600_2","TSDB_600_3");
private static final String GROUPID = "testGroup";
private static final String ZKS = "192.168.1.1:2181";
private Map TAGS = ImmutableMap.builder()
@@ -96,10 +82,9 @@ public class TestKafkaRpcPluginThread {
private TSDB tsdb;
private KafkaRpcPluginConfig config;
private KafkaRpcPluginGroup group;
- private ConsumerConnector consumer_connector;
- private ConsumerIterator iterator;
- private List> stream_list;
- private MessageAndMetadata message;
+ private KafkaConsumer consumer;
+ private ConsumerRecords consumerRecords;
+ private ConsumerRecord consumerRecord;
private RateLimiter rate_limiter;
private TypedIncomingData data;
private KafkaRpcPlugin parent;
@@ -113,17 +98,14 @@ public void before() throws Exception {
tsdb = PowerMockito.mock(TSDB.class);
config = new KafkaRpcPluginConfig(new Config(false));
group = mock(KafkaRpcPluginGroup.class);
- message = mock(MessageAndMetadata.class);
rate_limiter = mock(RateLimiter.class);
requeue = mock(KafkaStorageExceptionHandler.class);
counters = new ConcurrentHashMap>();
deserializer = new JSONDeserializer();
-
- consumer_connector = mock(ConsumerConnector.class);
- mockStatic(Consumer.class);
- when(Consumer.createJavaConsumerConnector((ConsumerConfig) any()))
- .thenReturn(consumer_connector);
+ consumer = mock(KafkaConsumer.class);
+ consumerRecords = mock(ConsumerRecords.class);
+ consumerRecord = mock(ConsumerRecord.class);
when(tsdb.getConfig()).thenReturn(config);
when(tsdb.getStorageExceptionHandler()).thenReturn(requeue);
@@ -140,30 +122,18 @@ public void before() throws Exception {
when(group.getGroupID()).thenReturn(GROUPID);
when(group.getConsumerType()).thenReturn(TsdbConsumerType.RAW);
when(group.getDeserializer()).thenReturn(deserializer);
-
+
config.overrideConfig(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX
+ "zookeeper.connect", ZKS);
-
- stream_list = mock(List.class);
- when(consumer_connector.createMessageStreamsByFilter(
- (TopicFilter) any(), anyInt())).thenReturn(stream_list);
-
- final KafkaStream stream = mock(KafkaStream.class);
- when(stream_list.get(0)).thenReturn(stream);
- iterator = mock(ConsumerIterator.class);
- when(stream.iterator()).thenReturn(iterator);
-
- when(iterator.hasNext()).thenReturn(true).thenReturn(false);
- when(iterator.next()).thenReturn(message);
-
- PowerMockito.mockStatic(ConsumerConfig.class);
- PowerMockito.whenNew(ConsumerConfig.class).withAnyArguments()
- .thenReturn(mock(ConsumerConfig.class));
-
- PowerMockito.mockStatic(Consumer.class);
- when(Consumer.createJavaConsumerConnector(any(ConsumerConfig.class)))
- .thenReturn(consumer_connector);
+ PowerMockito.mockStatic(KafkaConsumer.class);
+ PowerMockito.whenNew(KafkaConsumer.class).withAnyArguments()
+ .thenReturn(consumer);
+ when(consumer.poll(anyLong()))
+ .thenReturn(consumerRecords)
+ .thenReturn(null);
+ when(consumerRecords.iterator())
+ .thenReturn(Collections.singletonList(consumerRecord).iterator());
}
@Test
@@ -224,7 +194,7 @@ public void ctorNullTopics() throws Exception {
@Test(expected = IllegalArgumentException.class)
public void ctorEmptyTopics() throws Exception {
- new KafkaRpcPluginThread(group, 1, "");
+ new KafkaRpcPluginThread(group, 1, Collections.emptyList());
}
@Test(expected = IllegalArgumentException.class)
@@ -281,15 +251,6 @@ public void buildConsumerPropertiesDefaults() throws Exception {
assertEquals(
Integer.toString(KafkaRpcPluginConfig.REBALANCE_BACKOFF_MS_DEFAULT),
props.get(KafkaRpcPluginConfig.REBALANCE_BACKOFF_MS));
- assertEquals(
- Integer.toString(KafkaRpcPluginConfig.REBALANCE_RETRIES_DEFAULT),
- props.get(KafkaRpcPluginConfig.REBALANCE_RETRIES));
- assertEquals(
- Integer.toString(KafkaRpcPluginConfig.ZK_SESSION_TIMEOUT_DEFAULT),
- props.get(KafkaRpcPluginConfig.ZOOKEEPER_SESSION_TIMEOUT_MS));
- assertEquals(
- Integer.toString(KafkaRpcPluginConfig.ZK_CONNECTION_TIMEOUT_DEFAULT),
- props.get(KafkaRpcPluginConfig.ZOOKEEPER_CONNECTION_TIMEOUT_MS));
assertEquals(ZKS, props.get("zookeeper.connect"));
}
@@ -316,15 +277,6 @@ public void buildConsumerPropertiesGroupOverride() throws Exception {
assertEquals(
Integer.toString(KafkaRpcPluginConfig.REBALANCE_BACKOFF_MS_DEFAULT),
props.get(KafkaRpcPluginConfig.REBALANCE_BACKOFF_MS));
- assertEquals(
- Integer.toString(KafkaRpcPluginConfig.REBALANCE_RETRIES_DEFAULT),
- props.get(KafkaRpcPluginConfig.REBALANCE_RETRIES));
- assertEquals(
- Integer.toString(KafkaRpcPluginConfig.ZK_SESSION_TIMEOUT_DEFAULT),
- props.get(KafkaRpcPluginConfig.ZOOKEEPER_SESSION_TIMEOUT_MS));
- assertEquals(
- Integer.toString(KafkaRpcPluginConfig.ZK_CONNECTION_TIMEOUT_DEFAULT),
- props.get(KafkaRpcPluginConfig.ZOOKEEPER_CONNECTION_TIMEOUT_MS));
assertEquals("10.0.0.1:2181", props.get("zookeeper.connect"));
}
@@ -340,31 +292,13 @@ public void shutdown() throws Exception {
final KafkaRpcPluginThread writer =
new KafkaRpcPluginThread(group, 1, TOPICS);
writer.run();
- writer.shutdown();
- verify(consumer_connector, times(1)).shutdown();
+ writer.close();
+ verify(consumer, times(1)).close();
}
-
- @Test
- public void runNoData() throws Exception {
- when(iterator.hasNext()).thenReturn(false);
- final KafkaRpcPluginThread writer = Mockito.spy(
- new KafkaRpcPluginThread(group, 1, TOPICS));
- writer.run();
- verify(tsdb, never()).addPoint(anyString(), anyLong(), anyLong(), anyMap());
- verify(tsdb, never()).addHistogramPoint(anyString(), anyLong(),
- any(byte[].class), anyMap());
- verify(tsdb, never()).addAggregatePoint(anyString(), anyLong(), anyLong(),
- anyMap(), anyBoolean(), anyString(), anyString(), anyString());
- verify(consumer_connector, times(1))
- .createMessageStreamsByFilter(any(TopicFilter.class), anyInt());
- verify(writer, times(1)).shutdown();
- verify(consumer_connector, times(1)).shutdown();
- }
-
@Test
public void runNoDataRestart() throws Exception {
- when(iterator.hasNext()).thenReturn(false);
+ when(consumer.poll(anyLong())).thenReturn(null);
final KafkaRpcPluginThread writer = Mockito.spy(
new KafkaRpcPluginThread(group, 1, TOPICS));
@@ -375,29 +309,8 @@ public void runNoDataRestart() throws Exception {
any(byte[].class), anyMap());
verify(tsdb, never()).addAggregatePoint(anyString(), anyLong(), anyLong(),
anyMap(), anyBoolean(), anyString(), anyString(), anyString());
- verify(consumer_connector, times(2))
- .createMessageStreamsByFilter(any(TopicFilter.class), anyInt());
verify(writer, times(2)).shutdown();
- verify(consumer_connector, times(2)).shutdown();
- }
-
- @Test
- public void runNoStreams() throws Exception {
- when(stream_list.get(0))
- .thenThrow(new ArrayIndexOutOfBoundsException());
-
- KafkaRpcPluginThread writer = Mockito.spy(
- new KafkaRpcPluginThread(group, 1, TOPICS));
- writer.run();
- verify(tsdb, never()).addPoint(anyString(), anyLong(), anyLong(), anyMap());
- verify(tsdb, never()).addHistogramPoint(anyString(), anyLong(),
- any(byte[].class), anyMap());
- verify(tsdb, never()).addAggregatePoint(anyString(), anyLong(), anyLong(),
- anyMap(), anyBoolean(), anyString(), anyString(), anyString());
- verify(consumer_connector, times(1))
- .createMessageStreamsByFilter(any(TopicFilter.class), anyInt());
- verify(writer, times(1)).shutdown();
- verify(consumer_connector, times(1)).shutdown();
+ verify(consumer, times(2)).close();
}
@Test
@@ -541,7 +454,7 @@ public void runGoodMessageRequeueHistogram() throws Exception {
@Test
public void runEmptyData() throws Exception {
- when(message.message()).thenReturn(new byte[] { '{', '}' });
+ when(consumerRecord.value()).thenReturn(new Bytes(new byte[] { '{', '}' }));
KafkaRpcPluginThread writer = Mockito.spy(
new KafkaRpcPluginThread(group, 1, TOPICS));
writer.run();
@@ -559,7 +472,7 @@ public void runEmptyMetric() throws Exception {
when(tsdb.addPoint(anyString(), anyLong(), anyLong(), anyMap()))
.thenReturn(Deferred.fromResult(null));
data = new Metric(null, TS, "42", TAGS);
- when(message.message()).thenReturn(JSON.serializeToBytes(data));
+ when(consumerRecord.value()).thenReturn(new Bytes(JSON.serializeToBytes(data)));
KafkaRpcPluginThread writer = Mockito.spy(
new KafkaRpcPluginThread(group, 1, TOPICS));
@@ -674,80 +587,23 @@ public void runRequeueThrottling() throws Exception {
@Test
public void runTooManyRuntimeException() throws Exception {
- when(iterator.hasNext()).thenReturn(true);
- when(iterator.next()).thenThrow(new RuntimeException("Foobar"));
- KafkaRpcPluginThread writer = Mockito.spy(
- new KafkaRpcPluginThread(group, 1, TOPICS));
- writer.run();
-
- verify(writer, times(1)).shutdown();
- verify(consumer_connector, times(1)).shutdown();
- }
-
- @Test
- public void runIteratorHasNextRuntimeException() throws Exception {
- when(iterator.hasNext()).thenThrow(new RuntimeException("Foobar"));
+ when(consumerRecords.iterator()).thenThrow(new RuntimeException("Foobar"));
KafkaRpcPluginThread writer = Mockito.spy(
new KafkaRpcPluginThread(group, 1, TOPICS));
writer.run();
verify(writer, times(1)).shutdown();
- verify(consumer_connector, times(1)).shutdown();
+ verify(consumer, times(1)).close();
}
@Test(expected = Exception.class)
- public void runIteratorHasNextException() throws Exception {
- when(iterator.hasNext()).thenThrow(new Exception("Foobar"));
+ public void consumerRecordsHasException() throws Exception {
+ when(consumerRecords.iterator()).thenThrow(new Exception("Foobar"));
KafkaRpcPluginThread writer = Mockito.spy(
new KafkaRpcPluginThread(group, 1, TOPICS));
writer.run();
}
- @Test
- public void runIteratorNextRuntimeException() throws Exception {
- when(iterator.next()).thenThrow(new RuntimeException("Foobar"));
- KafkaRpcPluginThread writer = Mockito.spy(
- new KafkaRpcPluginThread(group, 1, TOPICS));
- writer.run();
-
- verify(writer, times(1)).shutdown();
- verify(consumer_connector, times(1)).shutdown();
- }
-
- @Test(expected = Exception.class)
- public void runIteratorNextException() throws Exception {
- when(iterator.next()).thenThrow(new Exception("Foobar"));
- KafkaRpcPluginThread writer = Mockito.spy(
- new KafkaRpcPluginThread(group, 1, TOPICS));
- writer.run();
- }
-
- @Test
- public void runConsumerRuntimeException() throws Exception {
- when(consumer_connector.createMessageStreamsByFilter(
- (TopicFilter) any(), anyInt())).thenThrow(
- new RuntimeException("Foobar"));
- KafkaRpcPluginThread writer = Mockito.spy(
- new KafkaRpcPluginThread(group, 1, TOPICS));
- writer.run();
-
- verify(writer, times(1)).shutdown();
- verify(consumer_connector, times(1)).shutdown();
- }
-
- @Test(expected = Exception.class)
- public void runConsumerException() throws Exception {
- when(consumer_connector.createMessageStreamsByFilter(
- (TopicFilter) any(), anyInt())).thenThrow(
- new Exception("Foobar"));
- KafkaRpcPluginThread writer = Mockito.spy(
- new KafkaRpcPluginThread(group, 1, TOPICS));
- writer.run();
-
- verify(writer, times(1)).shutdown();
- verify(consumer_connector, times(1)).shutdown();
- }
-
// ------ HELPERS ---------
private void setupRawData(final boolean requeued) {
@@ -757,7 +613,7 @@ private void setupRawData(final boolean requeued) {
if (requeued) {
data.setRequeueTS(TS + 60);
}
- when(message.message()).thenReturn(JSON.serializeToBytes(data));
+ when(consumerRecord.value()).thenReturn(new Bytes(JSON.serializeToBytes(data)));
}
private void setupRawDataList(final boolean requeued) {
@@ -779,7 +635,7 @@ private void setupRawDataList(final boolean requeued) {
sb.append(new String(JSON.serializeToBytes(ev2)));
sb.append("]");
- when(message.message()).thenReturn(sb.toString().getBytes());
+ when(consumerRecord.value()).thenReturn(new Bytes(sb.toString().getBytes()));
}
private void setupAggData(final boolean requeued, final boolean is_group_by) {
@@ -794,7 +650,7 @@ private void setupAggData(final boolean requeued, final boolean is_group_by) {
if (requeued) {
data.setRequeueTS(TS + 60);
}
- when(message.message()).thenReturn(JSON.serializeToBytes(data));
+ when(consumerRecord.value()).thenReturn(new Bytes(JSON.serializeToBytes(data)));
}
private void setupHistogramData(final boolean requeued) {
@@ -819,7 +675,7 @@ private void setupHistogramData(final boolean requeued) {
if (requeued) {
data.setRequeueTS(TS + 60);
}
- when(message.message()).thenReturn(JSON.serializeToBytes(data));
+ when(consumerRecord.value()).thenReturn(new Bytes(JSON.serializeToBytes(data)));
}
/**
@@ -908,10 +764,7 @@ private void verifyCtrsZero(final String[] types, final String namespace) {
private void verifyMessageRead(final KafkaRpcPluginThread writer,
final boolean requeued) {
verify(writer, times(1)).shutdown();
- verify(consumer_connector, times(1)).shutdown();
- verify(consumer_connector, times(1))
- .createMessageStreamsByFilter(any(TopicFilter.class), anyInt());
- verify(iterator, times(2)).hasNext();
+ verify(consumer, times(1)).close();
if (requeued) {
verify(requeue, times(1)).handleError(
any(IncomingDataPoint.class), any(Exception.class));