Skip to content

Commit

Permalink
#154 Log redis metrics to Eventbus
Browse files Browse the repository at this point in the history
  • Loading branch information
mcweba committed Mar 25, 2024
1 parent f249d1c commit 9d63d5f
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 20 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ The following configuration values are available:
| httpRequestHandlerPort | 7070 | The port of the HTTP API |
| httpRequestHandlerUserHeader | x-rp-usr | The name of the header property where the user information is provided. Used for the HTTP API |
| queueConfigurations | | Configure retry intervals and enqueue delaying for queue patterns |
| publish-metrics-address | | The EventBus address to send collected redis metrics to |
| metric-storage-name | queue | The name of the storage used in the published metrics |
| metric-refresh-period | 10 | The frequency [s] of collecting metrics from redis database |

### Configuration util

Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<!-- TEST dependencies -->
<dependency>
Expand Down Expand Up @@ -388,6 +393,7 @@
<slf4j.version>2.0.10</slf4j.version>
<mockito.version>5.8.0</mockito.version>
<junit.version>4.13.2</junit.version>
<guava.version>33.1.0-jre</guava.version>
<awaitility.version>4.2.0</awaitility.version>
<jedis.version>3.7.0</jedis.version>
<rest-assured.version>5.4.0</rest-assured.version>
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.swisspush.redisques;

import com.google.common.base.Strings;
import io.vertx.core.*;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
Expand Down Expand Up @@ -91,6 +92,7 @@ private enum QueueState {
private MemoryUsageProvider memoryUsageProvider;
private QueueActionFactory queueActionFactory;
private RedisquesConfigurationProvider configurationProvider;
private RedisMonitor redisMonitor;

private Map<QueueOperation, QueueAction> queueActions = new HashMap<>();

Expand Down Expand Up @@ -257,6 +259,19 @@ private void initialize() {

registerActiveQueueRegistrationRefresh();
registerQueueCheck();
registerMetricsGathering(configuration);
}

private void registerMetricsGathering(RedisquesConfiguration configuration){
String metricsAddress = configuration.getPublishMetricsAddress();
if(Strings.isNullOrEmpty(metricsAddress)) {
return;
}
String metricStorageName = configuration.getMetricStorageName();
int metricRefreshPeriod = configuration.getMetricRefreshPeriod();

redisMonitor = new RedisMonitor(vertx, redisProvider, metricsAddress, metricStorageName, metricRefreshPeriod);
redisMonitor.start();
}

private void registerActiveQueueRegistrationRefresh() {
Expand Down Expand Up @@ -379,6 +394,9 @@ private void unsupportedOperation(String operation, Message<JsonObject> event) {
@Override
public void stop() {
unregisterConsumers(true);
if(redisMonitor != null) {
redisMonitor.stop();

Check warning on line 398 in src/main/java/org/swisspush/redisques/RedisQues.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/redisques/RedisQues.java#L398

Added line #L398 was not covered by tests
}
}

private void gracefulStop(final Handler<Void> doneHandler) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.swisspush.redisques.util;

import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;

public class EventBusMetricsPublisher implements MetricsPublisher {

private final Vertx vertx;
private final String monitoringAddress;
private final String prefix;

public EventBusMetricsPublisher(Vertx vertx, String monitoringAddress, String prefix) {
this.vertx = vertx;
this.monitoringAddress = monitoringAddress;
this.prefix = prefix;
}

@Override
public void publishMetric(String name, long value) {
vertx.eventBus().publish(monitoringAddress,
new JsonObject().put("name", prefix + name).put("action", "set").put("n", value));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.swisspush.redisques.util;

public interface MetricsPublisher {

void publishMetric(String name, long value);
}
107 changes: 107 additions & 0 deletions src/main/java/org/swisspush/redisques/util/RedisMonitor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package org.swisspush.redisques.util;

import com.google.common.base.Splitter;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class RedisMonitor {
private final Vertx vertx;
private final RedisProvider redisProvider;
private final int period;
private long timer;
private final Logger log = LoggerFactory.getLogger(RedisMonitor.class);

private static final String DELIMITER = ":";

private final MetricsPublisher publisher;

/**
* @param vertx vertx
* @param redisProvider RedisProvider
* @param name name
* @param period in seconds.
*/
public RedisMonitor(Vertx vertx, RedisProvider redisProvider, String monitoringAddress, String name, int period) {
this(vertx, redisProvider, name, period,
new EventBusMetricsPublisher(vertx, monitoringAddress, "redis." + name + ".")
);
}

public RedisMonitor(Vertx vertx, RedisProvider redisProvider, String name, int period, MetricsPublisher publisher) {
this.vertx = vertx;
this.redisProvider = redisProvider;
this.period = period * 1000;
this.publisher = publisher;
}

public void start() {
timer = vertx.setPeriodic(period, timer -> redisProvider.redis().onSuccess(redisAPI -> {
redisAPI.info(new ArrayList<>()).onComplete(event -> {
if (event.succeeded()) {
collectMetrics(event.result().toBuffer());
} else {
log.warn("Cannot collect INFO from redis");

Check warning on line 50 in src/main/java/org/swisspush/redisques/util/RedisMonitor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/redisques/util/RedisMonitor.java#L50

Added line #L50 was not covered by tests
}
});
}).onFailure(throwable -> log.warn("Cannot collect INFO from redis", throwable)));
}

public void stop() {
if (timer != 0) {
vertx.cancelTimer(timer);

Check warning on line 58 in src/main/java/org/swisspush/redisques/util/RedisMonitor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/redisques/util/RedisMonitor.java#L58

Added line #L58 was not covered by tests
}
}

Check warning on line 60 in src/main/java/org/swisspush/redisques/util/RedisMonitor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/redisques/util/RedisMonitor.java#L60

Added line #L60 was not covered by tests

private void collectMetrics(Buffer buffer) {
Map<String, String> map = new HashMap<>();

Splitter.on(System.lineSeparator()).omitEmptyStrings()
.trimResults().splitToList(buffer.toString()).stream()
.filter(input -> input != null && input.contains(DELIMITER)
&& !input.contains("executable")
&& !input.contains("config_file")).forEach(entry -> {
List<String> keyValue = Splitter.on(DELIMITER).omitEmptyStrings().trimResults().splitToList(entry);
if (keyValue.size() == 2) {
map.put(keyValue.get(0), keyValue.get(1));
}
});

log.debug("got redis metrics {}", map);

map.forEach((key, valueStr) -> {
long value;
try {
if (key.startsWith("db")) {
String[] pairs = valueStr.split(",");
for (String pair : pairs) {
String[] tokens = pair.split("=");
if (tokens.length == 2) {
value = Long.parseLong(tokens[1]);
publisher.publishMetric("keyspace." + key + "." + tokens[0], value);
} else {
log.warn("Invalid keyspace property. Will be ignored");

Check warning on line 89 in src/main/java/org/swisspush/redisques/util/RedisMonitor.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/swisspush/redisques/util/RedisMonitor.java#L89

Added line #L89 was not covered by tests
}
}
} else if (key.contains("_cpu_")) {
value = (long) (Double.parseDouble(valueStr) * 1000.0);
publisher.publishMetric(key, value);
} else if (key.contains("fragmentation_ratio")) {
value = (long) (Double.parseDouble(valueStr));
publisher.publishMetric(key, value);
} else {
value = Long.parseLong(valueStr);
publisher.publishMetric(key, value);
}
} catch (NumberFormatException e) {
// ignore this field
}
});
}
}
Loading

0 comments on commit 9d63d5f

Please sign in to comment.