Skip to content

Commit

Permalink
♻️ Remove all async code from MongoDB Websocket API thanks to virtual…
Browse files Browse the repository at this point in the history
… threads
  • Loading branch information
ujibang committed Feb 18, 2024
1 parent b39b610 commit 45101de
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 560 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
package org.restheart.mongodb.handlers.changestreams;

import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.bson.BsonArray;
Expand All @@ -31,80 +34,131 @@
import org.bson.Document;
import org.restheart.mongodb.RHMongoClients;
import org.restheart.utils.BsonUtils;
import org.restheart.utils.LambdaUtils;
import org.restheart.utils.ThreadsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;

import io.undertow.websockets.core.WebSocketCallback;
import io.undertow.websockets.core.WebSocketChannel;
import io.undertow.websockets.core.WebSockets;

/**
* ChangeStreamWorker initiates and monitors the change changeStream and submit change event notifications
* ChangeStreamWorker initiates and monitors the MongoDB change stream
* and dispaches virtual threads to send change event to clients
*
* @author Andrea Di Cesare {@literal <[email protected]>}
*/
public class ChangeStreamWorker implements Runnable {

private static final Logger LOGGER = LoggerFactory.getLogger(ChangeStreamWorker.class);

private final ChangeStreamKey changeStreamKey;
private final ChangeStreamWorkerKey key;
private final List<BsonDocument> resolvedStages;
private final String dbName;
private final String collName;
private final Set<WebSocketSession> websocketSessions = Collections.synchronizedSet(new HashSet<>());

public ChangeStreamWorker(ChangeStreamKey changeStreamKey, List<BsonDocument> resolvedStages, String dbName, String collName) {
public ChangeStreamWorker(ChangeStreamWorkerKey key, List<BsonDocument> resolvedStages, String dbName, String collName) {
super();
this.changeStreamKey = changeStreamKey;
this.key = key;
this.resolvedStages = resolvedStages;
this.dbName = dbName;
this.collName = collName;
}

public ChangeStreamWorkerKey getKey() {
return this.key;
}

public String getDbName() {
return this.dbName;
}

public String getCollName() {
return this.collName;
}

@Override
public void run() {
var changeStream = starChangeStream();
LOGGER.debug("Change Stream Worker {} started listening for change events", this.key);

try {
changeStream.forEach(notification -> onNext(notification));
changeStream.forEach(changeEvent -> {
if (this.websocketSessions.isEmpty()) {
// this terminates the ChangeStreamWorker
LambdaUtils.throwsSneakyException(new NoMoreWebSocketException());
}

var msg = BsonUtils.toJson(getDocument(changeEvent), key.getJsonMode());

this.websocketSessions.stream().forEach(session -> ThreadsUtils.virtualThreadsExecutor().execute(() -> {
LOGGER.debug("Sending change event to WebSocket session {}", session.getId());
try {
this.send(session, msg);
} catch (Throwable t) {
LOGGER.error("Error sending change event to WebSocket session ", session.getId(), t);
}
}));
});
} catch(Throwable t) {
closeAllWebSocketSessionsOnError(changeStreamKey);
if (t instanceof NoMoreWebSocketException) {
ChangeStreamWorkers.getInstance().remove(key);
LOGGER.debug("Closing Change Stream Worker {} since it has no active WebSocket sessions", key);
} else {
LOGGER.error("Change Stream Worker {} died due to execption", key, t);
}
} finally {
closeAllWebSocketSessions();
}
}

private void onNext(ChangeStreamDocument<?> notification) {
if (!WebSocketSessions.getInstance().get(changeStreamKey).isEmpty()) {
LOGGER.trace("[clients watching]: " + WebSocketSessions.getInstance().get(changeStreamKey).size());
public Set<WebSocketSession> websocketSessions() {
return this.websocketSessions;
}

LOGGER.trace("change stream notification for changeStreamKey={}: {}", changeStreamKey, notification);
private void send(WebSocketSession session, String message) {
WebSockets.sendText(message, session.getChannel(), new WebSocketCallback<Void>() {
@Override
public void complete(final WebSocketChannel channel, Void context) {
}

Notifications.submit(new Notification(changeStreamKey, BsonUtils.toJson(getDocument(notification), changeStreamKey.getJsonMode())));
} else {
LOGGER.debug("closing change stream worker with no active websocket sessions, changeStreamKey=" + changeStreamKey);
ChangeStreams.getInstance().remove(changeStreamKey);
// exit the infinite changeStream.forEeach() loop and termminate the thread
throw new IllegalStateException("terminate due to no active websocket sessions");
}
}
@Override
public void onError(final WebSocketChannel channel, Void context, Throwable throwable) {
// close WebSocket session

private void closeAllWebSocketSessionsOnError(ChangeStreamKey cs) {
var webSocketSessions = WebSocketSessions.getInstance();
var websocketsToClose = webSocketSessions.get(cs);
var sid = session.getId();
websocketSessions().removeIf(s -> s.getId().equals(sid));
LOGGER.info("WebSocket session closed {}", session.getId());

websocketsToClose.stream()
try {
session.close();
} catch (IOException e) {
LOGGER.warn("Error closing WebSocket session {}", session.getId());
}
}
});
}

void closeAllWebSocketSessions() {
websocketSessions.stream()
.collect(Collectors.toSet())
.forEach(wsk -> {
try {
wsk.close();
webSocketSessions.remove(cs, wsk);
} catch(IOException ioe) {
LOGGER.warn("Error closing websocket session {}", wsk, ioe);
LOGGER.warn("Error closing WebSocket session {}", wsk, ioe);
}
});

ChangeStreams.getInstance().remove(cs);
}

private static class NoMoreWebSocketException extends Exception {}

private ChangeStreamIterable<Document> starChangeStream() {
try {
return RHMongoClients.mclient()
Expand All @@ -129,7 +183,7 @@ private BsonDocument getDocument(ChangeStreamDocument<?> notification) {
try {
doc.put("fullDocument", BsonUtils.documentToBson((Document) notification.getFullDocument()));
} catch(ClassCastException cce) {
LOGGER.warn("change stream fullDocument is not json {}", notification.getFullDocument());
LOGGER.warn("change event fullDocument is not json {}", notification.getFullDocument());
doc.put("fullDocument", BsonNull.VALUE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@
import io.undertow.websockets.spi.WebSocketHttpExchange;

/**
* Idendify a MongoDB change stream
* Idendifies a ChangeStreamWorker
*
* @author Andrea Di Cesare {@literal <[email protected]>}
*/
public class ChangeStreamKey {
public class ChangeStreamWorke {
private final String url;
private final BsonDocument avars;
private final JsonMode jsonMode;

public ChangeStreamKey(String url, BsonDocument avars, JsonMode jsonMode) {
public ChangeStreamWorke(String url, BsonDocument avars, JsonMode jsonMode) {
this.url = url;
this.avars = avars;
this.jsonMode = jsonMode;
}

public ChangeStreamKey(WebSocketHttpExchange exchange) {
public ChangeStreamWorke(WebSocketHttpExchange exchange) {
if (!exchange.getQueryString().isEmpty()) {
var qstring = encode("?".concat(exchange.getQueryString()));
var uri = encode(exchange.getRequestURI());
Expand All @@ -63,7 +63,7 @@ public ChangeStreamKey(WebSocketHttpExchange exchange) {
this.jsonMode = exchange.getAttachment(GetChangeStreamHandler.JSON_MODE_ATTACHMENT_KEY);
}

public ChangeStreamKey(HttpServerExchange exchange) {
public ChangeStreamWorke(HttpServerExchange exchange) {
this.url = encode(exchange.getRequestPath());

this.avars = exchange.getAttachment(GetChangeStreamHandler.AVARS_ATTACHMENT_KEY);
Expand All @@ -77,7 +77,7 @@ public int hashCode() {

@Override
public boolean equals(Object obj) {
if (!(obj instanceof ChangeStreamKey)) {
if (!(obj instanceof ChangeStreamWorke)) {
return false;
} else {
return obj.hashCode() == this.hashCode();
Expand All @@ -88,7 +88,7 @@ public boolean equals(Object obj) {
public String toString() {
var _url = this.url == null ? null : URLDecoder.decode(this.url);

return "ChangeStreamKey{url: " + _url + ", avars: " + BsonUtils.toJson(this.avars) + ", jsonMode: " + this.jsonMode + "}";
return "ChangeStreamWorkerKey{url: " + _url + ", avars: " + BsonUtils.toJson(this.avars) + ", jsonMode: " + this.jsonMode + "}";
}

private static String encode(String queryString) {
Expand Down
Loading

0 comments on commit 45101de

Please sign in to comment.