forked from vert-x3/vertx-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDashboardVerticle.java
63 lines (49 loc) · 2.03 KB
/
DashboardVerticle.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package io.vertx.example.kafka.dashboard;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.handler.StaticHandler;
import io.vertx.ext.web.handler.sockjs.BridgeOptions;
import io.vertx.ext.web.handler.sockjs.PermittedOptions;
import io.vertx.ext.web.handler.sockjs.SockJSHandler;
import io.vertx.kafka.client.consumer.KafkaReadStream;
import java.util.Collections;
public class DashboardVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
Router router = Router.router(vertx);
// The event bus bridge handler
BridgeOptions options = new BridgeOptions();
options.setOutboundPermitted(Collections.singletonList(new PermittedOptions().setAddress("dashboard")));
router.mountSubRouter("/eventbus", SockJSHandler.create(vertx).bridge(options));
// The web server handler
router.route().handler(StaticHandler.create().setCachingEnabled(false));
// Start http server
HttpServer httpServer = vertx.createHttpServer();
httpServer.requestHandler(router).listen(8080, ar -> {
if (ar.succeeded()) {
System.out.println("Http server started");
} else {
ar.cause().printStackTrace();
}
});
// Our dashboard that aggregates metrics from various kafka topics
JsonObject dashboard = new JsonObject();
// Publish the dashboard to the browser over the bus
vertx.setPeriodic(1000, timerID -> {
vertx.eventBus().publish("dashboard", dashboard);
});
// Get the Kafka consumer config
JsonObject config = config();
// Create the consumer
KafkaReadStream<String, JsonObject> consumer = KafkaReadStream.create(vertx, config.getMap(), String.class, JsonObject.class);
// Aggregates metrics in the dashboard
consumer.handler(record -> {
JsonObject obj = record.value();
dashboard.mergeIn(obj);
});
// Subscribe to Kafka
consumer.subscribe(Collections.singleton("the_topic"));
}
}