From 119bd8549223c1e63bbb3258627a140eadab7e55 Mon Sep 17 00:00:00 2001 From: Alex Parrill Date: Tue, 12 Mar 2019 11:03:42 -0400 Subject: [PATCH] Update Kafka client The current client is several major versions old and uses deprecated functionality that doesn't play nice with other clients (zookeeper offsets). --- pom.xml | 5 +- .../opentsdb/tsd/KafkaRpcPluginConfig.java | 42 +-- .../net/opentsdb/tsd/KafkaRpcPluginGroup.java | 18 +- .../opentsdb/tsd/KafkaRpcPluginThread.java | 260 +++++++++--------- .../opentsdb/tsd/KafkaSimplePartitioner.java | 39 --- .../tsd/KafkaStorageExceptionHandler.java | 26 +- .../tsd/TestKafkaRpcPluginConfig.java | 23 +- .../opentsdb/tsd/TestKafkaRpcPluginGroup.java | 8 +- .../tsd/TestKafkaRpcPluginThread.java | 259 ++++------------- 9 files changed, 232 insertions(+), 448 deletions(-) delete mode 100644 src/main/java/net/opentsdb/tsd/KafkaSimplePartitioner.java diff --git a/pom.xml b/pom.xml index a7fea4f..8b1ac7d 100644 --- a/pom.xml +++ b/pom.xml @@ -55,8 +55,8 @@ org.apache.kafka - kafka_2.9.2 - 0.8.1.1 + kafka-clients + 2.1.1 @@ -205,7 +205,6 @@ org.scala-lang:* com.yammer.metrics:* org.xerial.snappy:* - net.sf.jopt-simple:* com.101tec:* log4j:* diff --git a/src/main/java/net/opentsdb/tsd/KafkaRpcPluginConfig.java b/src/main/java/net/opentsdb/tsd/KafkaRpcPluginConfig.java index 147a61b..f1cce09 100644 --- a/src/main/java/net/opentsdb/tsd/KafkaRpcPluginConfig.java +++ b/src/main/java/net/opentsdb/tsd/KafkaRpcPluginConfig.java @@ -14,10 +14,10 @@ // limitations under the License. package net.opentsdb.tsd; -import java.util.Map; - import net.opentsdb.utils.Config; +import java.util.Map; + /** * The configuration class for the Kafka Consumer */ @@ -37,23 +37,15 @@ public class KafkaRpcPluginConfig extends Config { // KAFKA public static final String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms"; - public static final String AUTO_COMMIT_ENABLE = "auto.commit.enable"; + public static final String AUTO_COMMIT_ENABLE = "enable.auto.commit"; public static final String AUTO_OFFSET_RESET = "auto.offset.reset"; - public static final String REBALANCE_BACKOFF_MS = "rebalance.backoff.ms"; - public static final String REBALANCE_RETRIES = "rebalance.retries.max"; - public static final String ZOOKEEPER_CONNECTION_TIMEOUT_MS = - "zookeeper.connection.timeout.ms"; - public static final String ZOOKEEPER_SESSION_TIMEOUT_MS = - "zookeeper.session.timeout.ms"; + public static final String REBALANCE_BACKOFF_MS = "rebalance.timeout.ms"; // KAFKA DEFAULTS public static final int AUTO_COMMIT_INTERVAL_DEFAULT = 5000; - public static final String AUTO_COMMIT_ENABLE_DEFAULT = "true"; - public static final String AUTO_OFFSET_RESET_DEFAULT = "smallest"; + public static final String AUTO_COMMIT_ENABLE_DEFAULT = "false"; + public static final String AUTO_OFFSET_RESET_DEFAULT = "latest"; public static final int REBALANCE_BACKOFF_MS_DEFAULT = 60000; - public static final int REBALANCE_RETRIES_DEFAULT = 20; - public static final int ZK_CONNECTION_TIMEOUT_DEFAULT = 60000; - public static final int ZK_SESSION_TIMEOUT_DEFAULT = 60000; // Producer defaults for the requeue public static final String REQUEUE_CONFIG_PREFIX = PLUGIN_PROPERTY_BASE + "seh"; @@ -61,19 +53,13 @@ public class KafkaRpcPluginConfig extends Config { // Kafka pass through values public static final String KAFKA_BROKERS = KAFKA_CONFIG_PREFIX + - "metadata.broker.list"; + "bootstrap.servers"; public static final String REQUIRED_ACKS = KAFKA_CONFIG_PREFIX + - "request.required.acks"; + "acks"; public static final String REQUEST_TIMEOUT = KAFKA_CONFIG_PREFIX + "request.timeout.ms"; public static final String MAX_RETRIES = KAFKA_CONFIG_PREFIX + - "send.max_retries"; - public static final String PARTITIONER_CLASS = KAFKA_CONFIG_PREFIX + - "partitioner.class"; - public static final String PRODUCER_TYPE = KAFKA_CONFIG_PREFIX + - "producer.type"; - public static final String KEY_SERIALIZER = KAFKA_CONFIG_PREFIX + - "key.serializer.class"; + "retries"; /** * Default ctor @@ -104,22 +90,12 @@ protected void setLocalDefaults() { AUTO_OFFSET_RESET_DEFAULT); default_map.put(KAFKA_CONFIG_PREFIX + REBALANCE_BACKOFF_MS, Integer.toString(REBALANCE_BACKOFF_MS_DEFAULT)); - default_map.put(KAFKA_CONFIG_PREFIX + REBALANCE_RETRIES, - Integer.toString(REBALANCE_RETRIES_DEFAULT)); - default_map.put(KAFKA_CONFIG_PREFIX + ZOOKEEPER_SESSION_TIMEOUT_MS, - Integer.toString(ZK_SESSION_TIMEOUT_DEFAULT)); - default_map.put(KAFKA_CONFIG_PREFIX + ZOOKEEPER_CONNECTION_TIMEOUT_MS, - Integer.toString(ZK_CONNECTION_TIMEOUT_DEFAULT)); default_map.put(METRIC_AGG_FREQUENCY, Integer.toString(DEFAULT_METRIC_AGG_FREQUENCY)); default_map.put(REQUIRED_ACKS, "0"); default_map.put(REQUEST_TIMEOUT, "10000"); default_map.put(MAX_RETRIES, "1000"); - default_map.put(PRODUCER_TYPE, "async"); - default_map.put(KEY_SERIALIZER, "kafka.serializer.StringEncoder"); - default_map.put(PARTITIONER_CLASS, - "net.opentsdb.tsd.KafkaSimplePartitioner"); for (Map.Entry entry : default_map.entrySet()) { if (!hasProperty(entry.getKey())) { diff --git a/src/main/java/net/opentsdb/tsd/KafkaRpcPluginGroup.java b/src/main/java/net/opentsdb/tsd/KafkaRpcPluginGroup.java index a110e9d..2c93d2d 100644 --- a/src/main/java/net/opentsdb/tsd/KafkaRpcPluginGroup.java +++ b/src/main/java/net/opentsdb/tsd/KafkaRpcPluginGroup.java @@ -15,10 +15,7 @@ package net.opentsdb.tsd; import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -32,7 +29,6 @@ import com.google.common.util.concurrent.RateLimiter; -import joptsimple.internal.Strings; import net.opentsdb.data.deserializers.Deserializer; import net.opentsdb.utils.PluginLoader; @@ -133,7 +129,7 @@ public KafkaRpcPluginGroup(final KafkaRpcPlugin parent, final String groupID) { final String deser_class = config.getString( KafkaRpcPluginConfig.PLUGIN_PROPERTY_BASE + groupID + ".deserializer"); - if (Strings.isNullOrEmpty(deser_class)) { + if (deser_class == null || deser_class.isEmpty()) { throw new IllegalArgumentException("Deserializer class cannot be null or empty."); } @@ -177,7 +173,7 @@ public KafkaRpcPluginGroup(final KafkaRpcPlugin parent, final String groupID) { } for (int i = 0; i < num_threads; i++) { - kafka_consumers.add(new KafkaRpcPluginThread(this, i, topics)); + kafka_consumers.add(new KafkaRpcPluginThread(this, i, Arrays.asList(topics.split(",")))); } timer.newTimeout(this, config.threadCheckInterval(), TimeUnit.MILLISECONDS); @@ -213,7 +209,11 @@ public void start() { /** Gracefully shuts down all of the consumer threads */ public void shutdown() { for (final KafkaRpcPluginThread consumer : kafka_consumers) { - consumer.shutdown(); + try { + consumer.close(); + } catch (InterruptedException e) { + LOG.error("Unexpected interruptedexception while closing consumer "+consumer); + } } } @@ -363,7 +363,7 @@ public void setRate(final double rate) { LOG.info("The rate has been set to zero for " + this + ". Killing threads."); for (final KafkaRpcPluginThread writer : kafka_consumers) { try { - writer.shutdown(); + writer.close(); } catch (Exception e) { LOG.error("Exception shutting down thread " + writer, e); } diff --git a/src/main/java/net/opentsdb/tsd/KafkaRpcPluginThread.java b/src/main/java/net/opentsdb/tsd/KafkaRpcPluginThread.java index f83eea1..fb5590b 100644 --- a/src/main/java/net/opentsdb/tsd/KafkaRpcPluginThread.java +++ b/src/main/java/net/opentsdb/tsd/KafkaRpcPluginThread.java @@ -17,34 +17,25 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.AtomicDouble; import com.google.common.util.concurrent.RateLimiter; - -import joptsimple.internal.Strings; - -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; - -import kafka.common.ConsumerRebalanceFailedException; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.consumer.TopicFilter; -import kafka.consumer.Whitelist; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; import net.opentsdb.core.IncomingDataPoint; import net.opentsdb.core.TSDB; import net.opentsdb.data.TypedIncomingData; import net.opentsdb.data.deserializers.Deserializer; import net.opentsdb.tsd.KafkaRpcPluginGroup.TsdbConsumerType; - +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + /** * New and Improved Kafka to TSDB Writer! Now with TSDB Integration! *

@@ -54,9 +45,8 @@ * the value is logged and bit-bucketed. */ public class KafkaRpcPluginThread extends Thread { - static final String CONSUMER_ID = "consumer.id"; + static final String CONSUMER_ID = "client.id"; static final String GROUP_ID = "group.id"; - static final String AGG_TAG = "_aggregate"; static final String DEFAULT_COUNTER_ID = "null"; private static final Logger LOG = LoggerFactory.getLogger( KafkaRpcPluginThread.class); @@ -119,12 +109,13 @@ public String toString() { private final int thread_id; private final KafkaRpcPluginGroup group; private final String consumer_id; - private final TopicFilter topic_filter; + private final List topics; private final int number_consumer_streams = 1; private final RateLimiter rate_limiter; private final TsdbConsumerType consumer_type; private final long requeue_delay; private final AtomicBoolean thread_running = new AtomicBoolean(); + private final AtomicBoolean shouldClose = new AtomicBoolean(); private final AtomicLong messagesReceived = new AtomicLong(); private final AtomicLong datapointsReceived = new AtomicLong(); @@ -132,8 +123,8 @@ public String toString() { private final AtomicDouble cumulativeRateDelay = new AtomicDouble(); private final AtomicDouble kafkaWaitTime = new AtomicDouble(); private final Deserializer deserializer; - - private ConsumerConnector consumer; + + private KafkaConsumer consumer; /** * Default ctor @@ -142,7 +133,7 @@ public String toString() { * @param topics The topic list to subscribe to */ public KafkaRpcPluginThread(final KafkaRpcPluginGroup group, - final int threadID, final String topics) { + final int threadID, final List topics) { if (topics == null || topics.isEmpty()) { throw new IllegalArgumentException("Missing topics"); } @@ -173,7 +164,7 @@ public KafkaRpcPluginThread(final KafkaRpcPluginGroup group, this.consumer_type = group.getConsumerType(); thread_running.set(false); - topic_filter = new Whitelist(topics); + this.topics = new ArrayList(topics); consumer_id = threadID + "_" + group.getParent().getHost(); if (consumer_type == TsdbConsumerType.REQUEUE_RAW) { if (group.getParent().getConfig().hasProperty( @@ -200,10 +191,12 @@ public String toString() { * @return The consumer connector * @throws IllegalArgumentException if the config is invalid */ - ConsumerConnector buildConsumerConnector() { + KafkaConsumer buildConsumerConnector() { final Properties properties = buildConsumerProperties(); - return kafka.consumer.Consumer - .createJavaConsumerConnector(new ConsumerConfig(properties)); + final KafkaConsumer consumer = new KafkaConsumer(properties); + consumer.subscribe(topics); + LOG.info("Initializing consumer. consumer id: " + consumer_id + ", topics: " + topics.toString() + ", props: " + properties); + return consumer; } /** @@ -240,8 +233,8 @@ Properties buildConsumerProperties() { // settings from other parts of our code base properties.put(GROUP_ID, group.getGroupID()); properties.put(CONSUMER_ID, consumer_id); - LOG.info("Initializing consumer config with consumer id " + consumer_id + - " and props " + properties); + properties.put("key.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"); + properties.put("value.deserializer", "org.apache.kafka.common.serialization.BytesDeserializer"); return properties; } @@ -249,105 +242,101 @@ Properties buildConsumerProperties() { public void run() { thread_running.set(true); Thread.currentThread().setName(group.getGroupID() + "_" + consumer_id); - + try { consumer = buildConsumerConnector(); - // We're only fetching one stream at a time per thread so we only care - // about the first one. - final KafkaStream stream = - consumer.createMessageStreamsByFilter(topic_filter, - number_consumer_streams).get(0); - final ConsumerIterator it = stream.iterator(); int errorCount = 0; long nanoCtr = System.nanoTime(); - while (it.hasNext()) { - try { - // The following line will block until a message has been read from - // the stream. - final MessageAndMetadata message = it.next(); - long delay = System.nanoTime() - nanoCtr; - if (nanoCtr > 0) { - kafkaWaitTime.addAndGet(((double) delay / (double)1000000)); - } - messagesReceived.incrementAndGet(); - - // Now that we have received the message, we should record the present - // system time. - final long recvTime = System.currentTimeMillis(); - - switch (consumer_type) { - case RAW: - case ROLLUP: - // Deserialize the event from the received (opaque) message. - final List eventList = - deserializer.deserialize(this, message.message()); - if (eventList == null) { - deserializationErrors.incrementAndGet(); - continue; - } - - // I ♡ Google! It's so easy! No release necessary! Thread Safe! - final double waiting = rate_limiter.acquire(); - cumulativeRateDelay.addAndGet(waiting); - datapointsReceived.addAndGet(eventList.size()); - for (TypedIncomingData ev : eventList) { - ev.processData(this, recvTime); - } - break; - case REQUEUE_RAW: - case REQUEUE_ROLLUP: - case UID_ABUSE: - final List requeuedList = - deserializer.deserialize(this, message.message()); - if (requeuedList == null) { - deserializationErrors.incrementAndGet(); - continue; + while (!shouldClose.get()) { + ConsumerRecords records = consumer.poll(Long.MAX_VALUE); + if(records == null) { + break; + } + + for (ConsumerRecord record : records) { + try { + long delay = System.nanoTime() - nanoCtr; + if (nanoCtr > 0) { + kafkaWaitTime.addAndGet(((double) delay / (double) 1000000)); } - - // to avoid tight requeue loops we want to sleep a spell - // if we receive a data point that was recently added - // to the queue - datapointsReceived.addAndGet(requeuedList.size()); - for(TypedIncomingData ev : requeuedList) { - final long requeueDiff = System.currentTimeMillis() - ev.getRequeueTS(); - if (requeueDiff < requeue_delay) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sleeping for " + (requeue_delay - requeueDiff) - + " ms due to requeue delay"); + messagesReceived.incrementAndGet(); + + // Now that we have received the message, we should record the present + // system time. + final long recvTime = System.currentTimeMillis(); + + switch (consumer_type) { + case RAW: + case ROLLUP: + // Deserialize the event from the received (opaque) message. + final List eventList = + deserializer.deserialize(this, record.value().get()); + if (eventList == null) { + deserializationErrors.incrementAndGet(); + continue; } - //incrementCounter(CounterType.RequeuesDelayed, ns); - Thread.sleep(requeue_delay - requeueDiff); - } - // I ♡ Google! It's so easy! No release necessary! Thread Safe! - final double w = rate_limiter.acquire(); - cumulativeRateDelay.addAndGet(w); - ev.processData(this, recvTime); - } - break; - default: - throw new IllegalStateException("Unknown consumer type: " + + // I ♡ Google! It's so easy! No release necessary! Thread Safe! + final double waiting = rate_limiter.acquire(); + cumulativeRateDelay.addAndGet(waiting); + datapointsReceived.addAndGet(eventList.size()); + for (TypedIncomingData ev : eventList) { + ev.processData(this, recvTime); + } + break; + case REQUEUE_RAW: + case REQUEUE_ROLLUP: + case UID_ABUSE: + final List requeuedList = + deserializer.deserialize(this, record.value().get()); + if (requeuedList == null) { + deserializationErrors.incrementAndGet(); + continue; + } + + // to avoid tight requeue loops we want to sleep a spell + // if we receive a data point that was recently added + // to the queue + datapointsReceived.addAndGet(requeuedList.size()); + for (TypedIncomingData ev : requeuedList) { + final long requeueDiff = System.currentTimeMillis() - ev.getRequeueTS(); + if (requeueDiff < requeue_delay) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sleeping for " + (requeue_delay - requeueDiff) + + " ms due to requeue delay"); + } + //incrementCounter(CounterType.RequeuesDelayed, ns); + Thread.sleep(requeue_delay - requeueDiff); + } + + // I ♡ Google! It's so easy! No release necessary! Thread Safe! + final double w = rate_limiter.acquire(); + cumulativeRateDelay.addAndGet(w); + ev.processData(this, recvTime); + } + break; + default: + throw new IllegalStateException("Unknown consumer type: " + consumer_type + " for " + this); + } + errorCount = 0; + } catch (RuntimeException e) { + LOG.error("Exception in kafkaReader or Tsdb Writer ", e); + incrementNamespaceCounter(CounterType.Exception, null); + errorCount++; + if (errorCount >= 5) { + LOG.error("Too many errors, Killing the consumer thread " + this); + throw e; + } } - errorCount = 0; - } catch (RuntimeException e) { - LOG.error("Exception in kafkaReader or Tsdb Writer ", e); - incrementNamespaceCounter(CounterType.Exception, null); - errorCount++; - if (errorCount >= 5) { - LOG.error("Too many errors, Killing the consumer thread " + this); - throw e; - } + nanoCtr = System.nanoTime(); } - nanoCtr = System.nanoTime(); + consumer.commitSync(); } - - LOG.warn("Consumer thread [" + this + "] has run out of messages"); - } catch (ConsumerRebalanceFailedException crfex) { - LOG.error("Failed to read Kafka partition from the consumer " - + "thread " + this, crfex); - group.incrementRebalanceFailures(); + } catch(WakeupException e) { + if(!shouldClose.get()) + LOG.error("Kafka thread "+this+" got wakeup event without shouldClose set; this is a bug"); } catch (Exception e) { LOG.error("Unexpected exception in Kafka thread: " + this, e); //incrementCounter(CounterType.DroppedException, null); @@ -364,17 +353,19 @@ public void run() { } thread_running.set(false); } - + /** - * Attempts to gracefully shutdown the thread, closing the consumer and waiting - * for the inflight RPCs to complete. + * Closes the Kafka connection. + * + * Not thread safe; should only be called at the end of run */ - public void shutdown() { + @VisibleForTesting + void shutdown() { LOG.info("Shutting down thread [" + this + "]"); try { if (consumer != null) { - consumer.shutdown(); - consumer = null; + consumer.close(); + // Note: can't set consumer = null here, as it may race with `close` calling `wakeup`. LOG.info("Shutdown the kafka consumer on thread [" + this + "]"); } } catch (Exception e) { @@ -391,6 +382,21 @@ public void shutdown() { System.exit(1); } } + + /** + * Tells the kafka thread to gracefully shut down, blocking until it does. + */ + public void close() throws InterruptedException { + this.close(true); + } + + public void close(boolean block) throws InterruptedException { + shouldClose.set(true); + if(consumer != null) + consumer.wakeup(); + if(block) + this.join(); + } /** * Increments a counter for the namespace in the map. @@ -475,7 +481,7 @@ public TSDB getTSDB() { } @VisibleForTesting - ConsumerConnector consumer() { + KafkaConsumer consumer() { return consumer; } @@ -506,7 +512,7 @@ String getDebugString(final IncomingDataPoint dp) { } String getPrefix(final String metric) { - if (Strings.isNullOrEmpty(metric)) { + if (metric == null || "".equals(metric)) { return DEFAULT_COUNTER_ID; } diff --git a/src/main/java/net/opentsdb/tsd/KafkaSimplePartitioner.java b/src/main/java/net/opentsdb/tsd/KafkaSimplePartitioner.java deleted file mode 100644 index 2f118ed..0000000 --- a/src/main/java/net/opentsdb/tsd/KafkaSimplePartitioner.java +++ /dev/null @@ -1,39 +0,0 @@ -// This file is part of OpenTSDB. -// Copyright (C) 2018 The OpenTSDB Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package net.opentsdb.tsd; - -import kafka.utils.VerifiableProperties; - -/** - * Simple partitioner class. It assumes that our key is a positive integer - * based on the hash code. If you pass in a string that can't parse to a Long, - * then the partitioner will throw an exception. - */ -public class KafkaSimplePartitioner implements kafka.producer.Partitioner { - - /** - * Ctor required for Kafka - * @param vb Properties to parse if necessary - */ - public KafkaSimplePartitioner(final VerifiableProperties vb) { - - } - - @Override - public int partition(final Object key, final int num_partitions) { - return (int)(Long.parseLong((String) key) % num_partitions); - } - -} diff --git a/src/main/java/net/opentsdb/tsd/KafkaStorageExceptionHandler.java b/src/main/java/net/opentsdb/tsd/KafkaStorageExceptionHandler.java index 7e8fb8a..317b116 100644 --- a/src/main/java/net/opentsdb/tsd/KafkaStorageExceptionHandler.java +++ b/src/main/java/net/opentsdb/tsd/KafkaStorageExceptionHandler.java @@ -20,6 +20,9 @@ import java.util.Properties; import java.util.concurrent.atomic.AtomicLong; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.utils.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,9 +30,6 @@ import com.google.common.base.Strings; import com.stumbleupon.async.Deferred; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; import net.opentsdb.core.IncomingDataPoint; import net.opentsdb.core.TSDB; import net.opentsdb.data.Aggregate; @@ -66,10 +66,10 @@ public enum KafkaRequeueTopic { private KafkaRpcPluginConfig config; /** A Kafka producer configuration object */ - private ProducerConfig producer_config; + private Properties producer_config; /** A Kafka producer */ - private Producer producer; + private KafkaProducer producer; /** * Default ctor @@ -139,8 +139,11 @@ private void setKafkaConfig() { entry.getValue()); } } + + properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.BytesSerializer"); - producer_config = new ProducerConfig(properties); + producer_config = properties; } @Override @@ -149,7 +152,7 @@ public void initialize(TSDB tsdb) { config = new KafkaRpcPluginConfig(tsdb.getConfig()); setKafkaConfig(); - producer = new Producer(producer_config); + producer = new KafkaProducer(producer_config); LOG.info("Initialized kafka requeue publisher."); } @@ -204,10 +207,11 @@ public void handleError(final IncomingDataPoint dp, } int hash = dp.getMetric().hashCode() + Objects.hashCode(dp.getTags()); - final KeyedMessage data = - new KeyedMessage(topic, - Integer.toString(Math.abs(hash)), - JSON.serializeToBytes(dp)); + final ProducerRecord data = new ProducerRecord( + topic, + Integer.toString(Math.abs(hash)), + new Bytes(JSON.serializeToBytes(dp)) + ); producer.send(data); final AtomicLong requeued = topic_requeued_counters.get(type); diff --git a/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginConfig.java b/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginConfig.java index 9e3a55a..331b351 100644 --- a/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginConfig.java +++ b/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginConfig.java @@ -15,14 +15,13 @@ package net.opentsdb.tsd; import net.opentsdb.utils.Config; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.modules.junit4.PowerMockRunner; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + @RunWith(PowerMockRunner.class) public class TestKafkaRpcPluginConfig { @@ -34,7 +33,7 @@ public void defaults() throws Exception { assertEquals(KafkaRpcPluginConfig.AUTO_COMMIT_INTERVAL_DEFAULT, config.getInt(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX + KafkaRpcPluginConfig.AUTO_COMMIT_INTERVAL_MS)); - assertTrue(config.getBoolean(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX + + assertFalse(config.getBoolean(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX + KafkaRpcPluginConfig.AUTO_COMMIT_ENABLE)); assertEquals(KafkaRpcPluginConfig.AUTO_OFFSET_RESET_DEFAULT, config.getString(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX + @@ -42,23 +41,9 @@ public void defaults() throws Exception { assertEquals(KafkaRpcPluginConfig.REBALANCE_BACKOFF_MS_DEFAULT, config.getInt(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX + KafkaRpcPluginConfig.REBALANCE_BACKOFF_MS)); - assertEquals(KafkaRpcPluginConfig.REBALANCE_RETRIES_DEFAULT, - config.getInt(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX + - KafkaRpcPluginConfig.REBALANCE_RETRIES)); - assertEquals(KafkaRpcPluginConfig.ZK_SESSION_TIMEOUT_DEFAULT, - config.getInt(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX + - KafkaRpcPluginConfig.ZOOKEEPER_SESSION_TIMEOUT_MS)); - assertEquals(KafkaRpcPluginConfig.ZK_CONNECTION_TIMEOUT_DEFAULT, - config.getInt(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX + - KafkaRpcPluginConfig.ZOOKEEPER_CONNECTION_TIMEOUT_MS)); assertEquals(0, config.getInt(KafkaRpcPluginConfig.REQUIRED_ACKS)); assertEquals(10000, config.getInt(KafkaRpcPluginConfig.REQUEST_TIMEOUT)); assertEquals(1000, config.getInt(KafkaRpcPluginConfig.MAX_RETRIES)); - assertEquals("async", config.getString(KafkaRpcPluginConfig.PRODUCER_TYPE)); - assertEquals("kafka.serializer.StringEncoder", - config.getString(KafkaRpcPluginConfig.KEY_SERIALIZER)); - assertEquals("net.opentsdb.tsd.KafkaSimplePartitioner", - config.getString(KafkaRpcPluginConfig.PARTITIONER_CLASS)); } } diff --git a/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginGroup.java b/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginGroup.java index 2c13ecd..b602a13 100644 --- a/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginGroup.java +++ b/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginGroup.java @@ -343,8 +343,8 @@ public void setRate() throws Exception { group.setRate(42); assertEquals(42, group.getRate(), 0.000); assertEquals(42, group.getRateLimiter().getRate(), 0.000); - verify(threads.get(0), never()).shutdown(); - verify(threads.get(1), never()).shutdown(); + verify(threads.get(0), never()).close(); + verify(threads.get(1), never()).close(); } @Test @@ -355,8 +355,8 @@ public void setRateZero() throws Exception { // remains the default as we can't set the limiter to 0 assertEquals(KafkaRpcPluginConfig.DEFAULT_CONSUMER_RATE, group.getRateLimiter().getRate(), 0.000); - verify(threads.get(0), times(1)).shutdown(); - verify(threads.get(1), times(1)).shutdown(); + verify(threads.get(0), times(1)).close(); + verify(threads.get(1), times(1)).close(); } @Test diff --git a/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginThread.java b/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginThread.java index d276843..dc5b791 100644 --- a/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginThread.java +++ b/src/test/java/net/opentsdb/tsd/TestKafkaRpcPluginThread.java @@ -14,39 +14,9 @@ // limitations under the License. package net.opentsdb.tsd; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Matchers.anyMap; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -import static org.powermock.api.mockito.PowerMockito.mock; -import static org.powermock.api.mockito.PowerMockito.mockStatic; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicLong; - -import kafka.consumer.Consumer; -import kafka.consumer.ConsumerConfig; -import kafka.consumer.ConsumerIterator; -import kafka.consumer.KafkaStream; -import kafka.consumer.TopicFilter; -import kafka.javaapi.consumer.ConsumerConnector; -import kafka.message.MessageAndMetadata; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.RateLimiter; +import com.stumbleupon.async.Deferred; import net.opentsdb.core.HistogramCodecManager; import net.opentsdb.core.IncomingDataPoint; import net.opentsdb.core.TSDB; @@ -59,7 +29,11 @@ import net.opentsdb.tsd.KafkaRpcPluginGroup.TsdbConsumerType; import net.opentsdb.utils.Config; import net.opentsdb.utils.JSON; - +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.utils.Bytes; import org.hbase.async.HBaseException; import org.hbase.async.PleaseThrottleException; import org.junit.Before; @@ -71,22 +45,34 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.RateLimiter; -import com.stumbleupon.async.Deferred; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.Assert.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyMap; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.*; +import static org.powermock.api.mockito.PowerMockito.mock; @PowerMockIgnore({ "javax.management.*" }) @RunWith(PowerMockRunner.class) @PrepareForTest({ TSDB.class, KafkaRpcPluginThread.class, Config.class, Thread.class, - Consumer.class, ConsumerConfig.class, PleaseThrottleException.class}) + KafkaConsumer.class, ConsumerConfig.class, PleaseThrottleException.class}) public class TestKafkaRpcPluginThread { private static final String METRIC = "sys.cpu.user"; private static final String METRIC2 = "sys.cpu.iowait"; private static final String PREFIX = "sys"; private static final long TS = 1492641000L; private static final String LOCALHOST = "localhost"; - private static final String TOPICS = "TSDB_600_1,TSDB_600_2,TSDB_600_3"; + private static final List TOPICS = Arrays.asList("TSDB_600_1","TSDB_600_2","TSDB_600_3"); private static final String GROUPID = "testGroup"; private static final String ZKS = "192.168.1.1:2181"; private Map TAGS = ImmutableMap.builder() @@ -96,10 +82,9 @@ public class TestKafkaRpcPluginThread { private TSDB tsdb; private KafkaRpcPluginConfig config; private KafkaRpcPluginGroup group; - private ConsumerConnector consumer_connector; - private ConsumerIterator iterator; - private List> stream_list; - private MessageAndMetadata message; + private KafkaConsumer consumer; + private ConsumerRecords consumerRecords; + private ConsumerRecord consumerRecord; private RateLimiter rate_limiter; private TypedIncomingData data; private KafkaRpcPlugin parent; @@ -113,17 +98,14 @@ public void before() throws Exception { tsdb = PowerMockito.mock(TSDB.class); config = new KafkaRpcPluginConfig(new Config(false)); group = mock(KafkaRpcPluginGroup.class); - message = mock(MessageAndMetadata.class); rate_limiter = mock(RateLimiter.class); requeue = mock(KafkaStorageExceptionHandler.class); counters = new ConcurrentHashMap>(); deserializer = new JSONDeserializer(); - - consumer_connector = mock(ConsumerConnector.class); - mockStatic(Consumer.class); - when(Consumer.createJavaConsumerConnector((ConsumerConfig) any())) - .thenReturn(consumer_connector); + consumer = mock(KafkaConsumer.class); + consumerRecords = mock(ConsumerRecords.class); + consumerRecord = mock(ConsumerRecord.class); when(tsdb.getConfig()).thenReturn(config); when(tsdb.getStorageExceptionHandler()).thenReturn(requeue); @@ -140,30 +122,18 @@ public void before() throws Exception { when(group.getGroupID()).thenReturn(GROUPID); when(group.getConsumerType()).thenReturn(TsdbConsumerType.RAW); when(group.getDeserializer()).thenReturn(deserializer); - + config.overrideConfig(KafkaRpcPluginConfig.KAFKA_CONFIG_PREFIX + "zookeeper.connect", ZKS); - - stream_list = mock(List.class); - when(consumer_connector.createMessageStreamsByFilter( - (TopicFilter) any(), anyInt())).thenReturn(stream_list); - - final KafkaStream stream = mock(KafkaStream.class); - when(stream_list.get(0)).thenReturn(stream); - iterator = mock(ConsumerIterator.class); - when(stream.iterator()).thenReturn(iterator); - - when(iterator.hasNext()).thenReturn(true).thenReturn(false); - when(iterator.next()).thenReturn(message); - - PowerMockito.mockStatic(ConsumerConfig.class); - PowerMockito.whenNew(ConsumerConfig.class).withAnyArguments() - .thenReturn(mock(ConsumerConfig.class)); - - PowerMockito.mockStatic(Consumer.class); - when(Consumer.createJavaConsumerConnector(any(ConsumerConfig.class))) - .thenReturn(consumer_connector); + PowerMockito.mockStatic(KafkaConsumer.class); + PowerMockito.whenNew(KafkaConsumer.class).withAnyArguments() + .thenReturn(consumer); + when(consumer.poll(anyLong())) + .thenReturn(consumerRecords) + .thenReturn(null); + when(consumerRecords.iterator()) + .thenReturn(Collections.singletonList(consumerRecord).iterator()); } @Test @@ -224,7 +194,7 @@ public void ctorNullTopics() throws Exception { @Test(expected = IllegalArgumentException.class) public void ctorEmptyTopics() throws Exception { - new KafkaRpcPluginThread(group, 1, ""); + new KafkaRpcPluginThread(group, 1, Collections.emptyList()); } @Test(expected = IllegalArgumentException.class) @@ -281,15 +251,6 @@ public void buildConsumerPropertiesDefaults() throws Exception { assertEquals( Integer.toString(KafkaRpcPluginConfig.REBALANCE_BACKOFF_MS_DEFAULT), props.get(KafkaRpcPluginConfig.REBALANCE_BACKOFF_MS)); - assertEquals( - Integer.toString(KafkaRpcPluginConfig.REBALANCE_RETRIES_DEFAULT), - props.get(KafkaRpcPluginConfig.REBALANCE_RETRIES)); - assertEquals( - Integer.toString(KafkaRpcPluginConfig.ZK_SESSION_TIMEOUT_DEFAULT), - props.get(KafkaRpcPluginConfig.ZOOKEEPER_SESSION_TIMEOUT_MS)); - assertEquals( - Integer.toString(KafkaRpcPluginConfig.ZK_CONNECTION_TIMEOUT_DEFAULT), - props.get(KafkaRpcPluginConfig.ZOOKEEPER_CONNECTION_TIMEOUT_MS)); assertEquals(ZKS, props.get("zookeeper.connect")); } @@ -316,15 +277,6 @@ public void buildConsumerPropertiesGroupOverride() throws Exception { assertEquals( Integer.toString(KafkaRpcPluginConfig.REBALANCE_BACKOFF_MS_DEFAULT), props.get(KafkaRpcPluginConfig.REBALANCE_BACKOFF_MS)); - assertEquals( - Integer.toString(KafkaRpcPluginConfig.REBALANCE_RETRIES_DEFAULT), - props.get(KafkaRpcPluginConfig.REBALANCE_RETRIES)); - assertEquals( - Integer.toString(KafkaRpcPluginConfig.ZK_SESSION_TIMEOUT_DEFAULT), - props.get(KafkaRpcPluginConfig.ZOOKEEPER_SESSION_TIMEOUT_MS)); - assertEquals( - Integer.toString(KafkaRpcPluginConfig.ZK_CONNECTION_TIMEOUT_DEFAULT), - props.get(KafkaRpcPluginConfig.ZOOKEEPER_CONNECTION_TIMEOUT_MS)); assertEquals("10.0.0.1:2181", props.get("zookeeper.connect")); } @@ -340,31 +292,13 @@ public void shutdown() throws Exception { final KafkaRpcPluginThread writer = new KafkaRpcPluginThread(group, 1, TOPICS); writer.run(); - writer.shutdown(); - verify(consumer_connector, times(1)).shutdown(); + writer.close(); + verify(consumer, times(1)).close(); } - - @Test - public void runNoData() throws Exception { - when(iterator.hasNext()).thenReturn(false); - final KafkaRpcPluginThread writer = Mockito.spy( - new KafkaRpcPluginThread(group, 1, TOPICS)); - writer.run(); - verify(tsdb, never()).addPoint(anyString(), anyLong(), anyLong(), anyMap()); - verify(tsdb, never()).addHistogramPoint(anyString(), anyLong(), - any(byte[].class), anyMap()); - verify(tsdb, never()).addAggregatePoint(anyString(), anyLong(), anyLong(), - anyMap(), anyBoolean(), anyString(), anyString(), anyString()); - verify(consumer_connector, times(1)) - .createMessageStreamsByFilter(any(TopicFilter.class), anyInt()); - verify(writer, times(1)).shutdown(); - verify(consumer_connector, times(1)).shutdown(); - } - @Test public void runNoDataRestart() throws Exception { - when(iterator.hasNext()).thenReturn(false); + when(consumer.poll(anyLong())).thenReturn(null); final KafkaRpcPluginThread writer = Mockito.spy( new KafkaRpcPluginThread(group, 1, TOPICS)); @@ -375,29 +309,8 @@ public void runNoDataRestart() throws Exception { any(byte[].class), anyMap()); verify(tsdb, never()).addAggregatePoint(anyString(), anyLong(), anyLong(), anyMap(), anyBoolean(), anyString(), anyString(), anyString()); - verify(consumer_connector, times(2)) - .createMessageStreamsByFilter(any(TopicFilter.class), anyInt()); verify(writer, times(2)).shutdown(); - verify(consumer_connector, times(2)).shutdown(); - } - - @Test - public void runNoStreams() throws Exception { - when(stream_list.get(0)) - .thenThrow(new ArrayIndexOutOfBoundsException()); - - KafkaRpcPluginThread writer = Mockito.spy( - new KafkaRpcPluginThread(group, 1, TOPICS)); - writer.run(); - verify(tsdb, never()).addPoint(anyString(), anyLong(), anyLong(), anyMap()); - verify(tsdb, never()).addHistogramPoint(anyString(), anyLong(), - any(byte[].class), anyMap()); - verify(tsdb, never()).addAggregatePoint(anyString(), anyLong(), anyLong(), - anyMap(), anyBoolean(), anyString(), anyString(), anyString()); - verify(consumer_connector, times(1)) - .createMessageStreamsByFilter(any(TopicFilter.class), anyInt()); - verify(writer, times(1)).shutdown(); - verify(consumer_connector, times(1)).shutdown(); + verify(consumer, times(2)).close(); } @Test @@ -541,7 +454,7 @@ public void runGoodMessageRequeueHistogram() throws Exception { @Test public void runEmptyData() throws Exception { - when(message.message()).thenReturn(new byte[] { '{', '}' }); + when(consumerRecord.value()).thenReturn(new Bytes(new byte[] { '{', '}' })); KafkaRpcPluginThread writer = Mockito.spy( new KafkaRpcPluginThread(group, 1, TOPICS)); writer.run(); @@ -559,7 +472,7 @@ public void runEmptyMetric() throws Exception { when(tsdb.addPoint(anyString(), anyLong(), anyLong(), anyMap())) .thenReturn(Deferred.fromResult(null)); data = new Metric(null, TS, "42", TAGS); - when(message.message()).thenReturn(JSON.serializeToBytes(data)); + when(consumerRecord.value()).thenReturn(new Bytes(JSON.serializeToBytes(data))); KafkaRpcPluginThread writer = Mockito.spy( new KafkaRpcPluginThread(group, 1, TOPICS)); @@ -674,80 +587,23 @@ public void runRequeueThrottling() throws Exception { @Test public void runTooManyRuntimeException() throws Exception { - when(iterator.hasNext()).thenReturn(true); - when(iterator.next()).thenThrow(new RuntimeException("Foobar")); - KafkaRpcPluginThread writer = Mockito.spy( - new KafkaRpcPluginThread(group, 1, TOPICS)); - writer.run(); - - verify(writer, times(1)).shutdown(); - verify(consumer_connector, times(1)).shutdown(); - } - - @Test - public void runIteratorHasNextRuntimeException() throws Exception { - when(iterator.hasNext()).thenThrow(new RuntimeException("Foobar")); + when(consumerRecords.iterator()).thenThrow(new RuntimeException("Foobar")); KafkaRpcPluginThread writer = Mockito.spy( new KafkaRpcPluginThread(group, 1, TOPICS)); writer.run(); verify(writer, times(1)).shutdown(); - verify(consumer_connector, times(1)).shutdown(); + verify(consumer, times(1)).close(); } @Test(expected = Exception.class) - public void runIteratorHasNextException() throws Exception { - when(iterator.hasNext()).thenThrow(new Exception("Foobar")); + public void consumerRecordsHasException() throws Exception { + when(consumerRecords.iterator()).thenThrow(new Exception("Foobar")); KafkaRpcPluginThread writer = Mockito.spy( new KafkaRpcPluginThread(group, 1, TOPICS)); writer.run(); } - @Test - public void runIteratorNextRuntimeException() throws Exception { - when(iterator.next()).thenThrow(new RuntimeException("Foobar")); - KafkaRpcPluginThread writer = Mockito.spy( - new KafkaRpcPluginThread(group, 1, TOPICS)); - writer.run(); - - verify(writer, times(1)).shutdown(); - verify(consumer_connector, times(1)).shutdown(); - } - - @Test(expected = Exception.class) - public void runIteratorNextException() throws Exception { - when(iterator.next()).thenThrow(new Exception("Foobar")); - KafkaRpcPluginThread writer = Mockito.spy( - new KafkaRpcPluginThread(group, 1, TOPICS)); - writer.run(); - } - - @Test - public void runConsumerRuntimeException() throws Exception { - when(consumer_connector.createMessageStreamsByFilter( - (TopicFilter) any(), anyInt())).thenThrow( - new RuntimeException("Foobar")); - KafkaRpcPluginThread writer = Mockito.spy( - new KafkaRpcPluginThread(group, 1, TOPICS)); - writer.run(); - - verify(writer, times(1)).shutdown(); - verify(consumer_connector, times(1)).shutdown(); - } - - @Test(expected = Exception.class) - public void runConsumerException() throws Exception { - when(consumer_connector.createMessageStreamsByFilter( - (TopicFilter) any(), anyInt())).thenThrow( - new Exception("Foobar")); - KafkaRpcPluginThread writer = Mockito.spy( - new KafkaRpcPluginThread(group, 1, TOPICS)); - writer.run(); - - verify(writer, times(1)).shutdown(); - verify(consumer_connector, times(1)).shutdown(); - } - // ------ HELPERS --------- private void setupRawData(final boolean requeued) { @@ -757,7 +613,7 @@ private void setupRawData(final boolean requeued) { if (requeued) { data.setRequeueTS(TS + 60); } - when(message.message()).thenReturn(JSON.serializeToBytes(data)); + when(consumerRecord.value()).thenReturn(new Bytes(JSON.serializeToBytes(data))); } private void setupRawDataList(final boolean requeued) { @@ -779,7 +635,7 @@ private void setupRawDataList(final boolean requeued) { sb.append(new String(JSON.serializeToBytes(ev2))); sb.append("]"); - when(message.message()).thenReturn(sb.toString().getBytes()); + when(consumerRecord.value()).thenReturn(new Bytes(sb.toString().getBytes())); } private void setupAggData(final boolean requeued, final boolean is_group_by) { @@ -794,7 +650,7 @@ private void setupAggData(final boolean requeued, final boolean is_group_by) { if (requeued) { data.setRequeueTS(TS + 60); } - when(message.message()).thenReturn(JSON.serializeToBytes(data)); + when(consumerRecord.value()).thenReturn(new Bytes(JSON.serializeToBytes(data))); } private void setupHistogramData(final boolean requeued) { @@ -819,7 +675,7 @@ private void setupHistogramData(final boolean requeued) { if (requeued) { data.setRequeueTS(TS + 60); } - when(message.message()).thenReturn(JSON.serializeToBytes(data)); + when(consumerRecord.value()).thenReturn(new Bytes(JSON.serializeToBytes(data))); } /** @@ -908,10 +764,7 @@ private void verifyCtrsZero(final String[] types, final String namespace) { private void verifyMessageRead(final KafkaRpcPluginThread writer, final boolean requeued) { verify(writer, times(1)).shutdown(); - verify(consumer_connector, times(1)).shutdown(); - verify(consumer_connector, times(1)) - .createMessageStreamsByFilter(any(TopicFilter.class), anyInt()); - verify(iterator, times(2)).hasNext(); + verify(consumer, times(1)).close(); if (requeued) { verify(requeue, times(1)).handleError( any(IncomingDataPoint.class), any(Exception.class));