Skip to content

Commit

Permalink
0.8.0
Browse files Browse the repository at this point in the history
Add support for concurrent requests to the Haystack server. Max Connections can be modified in the Edit Server action.
  • Loading branch information
a-hansen authored Jul 15, 2021
1 parent 00a79f7 commit cadcb37
Show file tree
Hide file tree
Showing 7 changed files with 146 additions and 39 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ apply plugin: 'java-library'
mainClassName = 'org.dsa.iot.haystack.Main'
sourceCompatibility = 1.7
targetCompatibility = 1.7
version = '0.7.0'
version = '0.8.0'

repositories {
mavenLocal()
Expand Down
2 changes: 1 addition & 1 deletion dslink.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "dslink-java-haystack",
"version": "0.7.0",
"version": "0.8.0",
"description": "An implementation dslink of a haystack protocol consumer",
"license": "Apache",
"author": {
Expand Down
131 changes: 110 additions & 21 deletions src/main/java/org/dsa/iot/haystack/Haystack.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.dsa.iot.haystack;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand Down Expand Up @@ -42,12 +44,16 @@
public class Haystack {

private static final Logger LOGGER = LoggerFactory.getLogger(Haystack.class);

private final ConnectionHelper conn;
private final NavHelper navHelper;
private final Node node;
private Set<HRef> pendingSubscribe;
private Set<HRef> pendingUnsubscribe;
private ScheduledFuture<?> pollFuture;
private final ScheduledThreadPoolExecutor stpe;
private final Map<String, Node> subs;
private boolean updating;
private boolean watchEnabled;

public Haystack(final Node node) {
Expand Down Expand Up @@ -85,6 +91,10 @@ public Haystack(final Node node) {
node.setConfig("read timeout", new Value(60));
}

if (node.getConfig("maxConnections") == null) {
node.setConfig("maxConnections", new Value(5));
}

this.stpe = Objects.createDaemonThreadPool();
this.node = node;
this.subs = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -148,6 +158,7 @@ public void editConnection(String url,
int pollRate,
int connTimeout,
int readTimeout,
int maxConnections,
boolean enabled) {
LOGGER.info("Edit Server url={} user={} enabled={}", url, user, enabled);
node.setRoConfig("lu", new Value(0));
Expand All @@ -173,7 +184,7 @@ public void editConnection(String url,
if (!enabled) {
Utils.getStatusNode(node).setValue(new Value("Disabled"));
} else {
conn.editConnection(url, user, pass, connTimeout, readTimeout);
conn.editConnection(url, user, pass, connTimeout, readTimeout, maxConnections);
setupPoll(pollRate);
}

Expand Down Expand Up @@ -219,6 +230,24 @@ public ScheduledThreadPoolExecutor getStpe() {
return stpe;
}

public int getMaxConnections() {
Value v = node.getConfig("maxConnections");
if (v != null) {
Number n = v.getNumber();
if (n != null) {
return n.intValue();
}
}
return 5;
}

public void setMaxConnections(int max) {
if (max < 1) {
throw new IllegalArgumentException("Max connections must be > 1: " + max);
}
node.setConfig("maxConnections", new Value(max));
}

public static void init(Node superRoot) {
NodeBuilder builder = Utils.getBuilder(superRoot, "addServer");
builder.setDisplayName("Add Server");
Expand Down Expand Up @@ -335,21 +364,50 @@ public void stop() {
conn.close();
}

public void subscribe(HRef id, Node node) {
subscribe(id, node, true);
public void subscribe(final HRef id, Node node) {
if (!isEnabled() || !watchEnabled) {
return;
}
synchronized (this) {
subs.put(id.toString(), node);
if (pendingSubscribe == null) {
pendingSubscribe = new HashSet<>();
}
if (pendingUnsubscribe != null) {
pendingUnsubscribe.remove(id);
}
if (pendingSubscribe.add(id)) {
stpe.schedule(new Runnable() {
@Override
public void run() {
updateSubscriptions();
}
}, 2, TimeUnit.SECONDS);
}
}
}

public void unsubscribe(final HRef id) {
if (!isEnabled() || !watchEnabled) {
return;
}
subs.remove(id.toString());
conn.getWatch(new StateHandler<HWatch>() {
@Override
public void handle(HWatch event) {
event.unsub(new HRef[]{id});
synchronized (this) {
subs.remove(id.toString());
if (pendingUnsubscribe == null) {
pendingUnsubscribe = new HashSet<>();
}
});
if (pendingSubscribe != null) {
pendingSubscribe.remove(id);
}
if (pendingUnsubscribe.add(id)) {
stpe.schedule(new Runnable() {
@Override
public void run() {
updateSubscriptions();
}
}, 2, TimeUnit.SECONDS);
}
}
}

void destroy() {
Expand Down Expand Up @@ -425,19 +483,50 @@ public void run() {
}, time, time, TimeUnit.SECONDS);
}

private void subscribe(final HRef id, Node node, boolean add) {
if (!isEnabled() || !watchEnabled) {
return;
}
if (add) {
subs.put(id.toString(), node);
private void updateSubscriptions() {
Set<HRef> toSubscribe;
Set<HRef> toUnsubscribe;
synchronized (this) {
if (updating) {
return;
}
updating = true;
toSubscribe = pendingSubscribe;
toUnsubscribe = pendingUnsubscribe;
pendingSubscribe = null;
pendingUnsubscribe = null;
}

conn.getWatch(new StateHandler<HWatch>() {
@Override
public void handle(HWatch event) {
event.sub(new HRef[]{id});
try {
while ((toSubscribe != null) || (toUnsubscribe != null)) {
if ((toSubscribe != null) && !toSubscribe.isEmpty()) {
final HRef[] ids = new HRef[toSubscribe.size()];
toSubscribe.toArray(ids);
conn.getWatch(new StateHandler<HWatch>() {
@Override
public void handle(HWatch event) {
event.sub(ids);
}
});
}
if ((toUnsubscribe != null) && !toUnsubscribe.isEmpty()) {
final HRef[] ids = new HRef[toUnsubscribe.size()];
toUnsubscribe.toArray(ids);
conn.getWatch(new StateHandler<HWatch>() {
@Override
public void handle(HWatch event) {
event.unsub(ids);
}
});
}
synchronized (this) {
toSubscribe = pendingSubscribe;
toUnsubscribe = pendingUnsubscribe;
pendingSubscribe = null;
pendingUnsubscribe = null;
}
}
});
} finally {
updating = false;
}
}
}
10 changes: 8 additions & 2 deletions src/main/java/org/dsa/iot/haystack/actions/ServerActions.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public void handle(ActionResult event) {
Value vPR = event.getParameter("Poll Rate", ValueType.NUMBER);
Value vConnTimeout = event.getParameter("Connect Timeout");
Value vReadTimeout = event.getParameter("Read Timeout");
Value vMaxConn = event.getParameter("Max Connections");

String url = vUrl.getString();
String user = vUser.getString();
Expand All @@ -128,9 +129,11 @@ public void handle(ActionResult event) {
int pollRate = vPR.getNumber().intValue();
int connTimeout = (int) (vConnTimeout.getNumber().doubleValue() * 1000);
int readTimeout = (int) (vReadTimeout.getNumber().doubleValue() * 1000);
int maxConn = vMaxConn.getNumber().intValue();
haystack.setMaxConnections(maxConn);

haystack.editConnection(
url, user, pass, pollRate, connTimeout, readTimeout, vEnabled.getBool());
haystack.editConnection(url, user, pass, pollRate, connTimeout, readTimeout,
maxConn, vEnabled.getBool());
}
});
{
Expand Down Expand Up @@ -170,6 +173,9 @@ public void handle(ActionResult event) {
a.addParameter(new Parameter(
"Read Timeout", ValueType.NUMBER, node.getConfig("read timeout"))
.setDescription("Read timeout in seconds"));
a.addParameter(new Parameter(
"Max Connections", ValueType.NUMBER, node.getConfig("maxConnections"))
.setDescription("Max concurrent requests to server"));
return a;
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/dsa/iot/haystack/handlers/ListHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ public void handle(final Node event) {
public void run() {
if (navId != null) {
String path = event.getPath();
LOGGER.info("Navigating: {} ({})", navId, path);
LOGGER.debug("Navigating: {} ({})", navId, path);
} else {
LOGGER.info("Navigating root");
LOGGER.debug("Navigating root");
}

try {
Expand Down
32 changes: 22 additions & 10 deletions src/main/java/org/dsa/iot/haystack/helpers/ConnectionHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.dsa.iot.dslink.node.Node;
import org.dsa.iot.dslink.node.value.Value;
Expand Down Expand Up @@ -42,6 +43,7 @@ public class ConnectionHelper {
private volatile String url;
private volatile int connectTimeout;
private volatile int readTimeout;
private volatile Semaphore maxConnections;
private final Node statusNode;

private ScheduledFuture<?> connectFuture;
Expand All @@ -54,6 +56,7 @@ public ConnectionHelper(Haystack haystack,
this.haystack = haystack;
this.watchEnabled = watchEnabled;
this.watchDisabled = watchDisabled;
this.maxConnections = new Semaphore(haystack.getMaxConnections(), true);

Node node = haystack.getNode();
username = node.getConfig("username").getString();
Expand All @@ -65,7 +68,7 @@ public ConnectionHelper(Haystack haystack,
}

public void editConnection(String url, String user, String pass, int connTimeout,
int readTimeout) {
int readTimeout, int maxConnections) {
close();
statusNode.setValue(new Value("Not Connected"));
this.url = url;
Expand All @@ -75,6 +78,7 @@ public void editConnection(String url, String user, String pass, int connTimeout
}
this.connectTimeout = connTimeout;
this.readTimeout = readTimeout;
this.maxConnections = new Semaphore(maxConnections, true);
getClient(null);
}

Expand Down Expand Up @@ -134,8 +138,12 @@ public void handle(HClient event) {
}

public void getClient(StateHandler<HClient> onClientReceived) {
Semaphore semaphore = maxConnections;
try {
semaphore.acquire();
connect(onClientReceived);
} catch (InterruptedException x) {
throw new RuntimeException(x);
} catch (CallErrException cee) {
if (onClientReceived != null && onClientReceived.incrementRetryCount() > 1) {
throw cee;
Expand Down Expand Up @@ -163,27 +171,31 @@ public void getClient(StateHandler<HClient> onClientReceived) {
if (rethrow) {
throw x;
}
} finally {
semaphore.release();
}
}

private void connect(Handler<HClient> onConnected) {
synchronized (lock) {
if (connectFuture == null && client != null) {
if (onConnected != null) {
onConnected.handle(client);
}
return;
//passthru
} else if (connectFuture != null) {
if (onConnected != null) {
queue.add(onConnected);
}
return;
} else {
close();
ScheduledThreadPoolExecutor stpe = Objects.getDaemonThreadPool();
Connector c = new Connector(onConnected);
TimeUnit u = TimeUnit.SECONDS;
connectFuture = stpe.scheduleWithFixedDelay(c, 0, 5, u);
return;
}
close();
ScheduledThreadPoolExecutor stpe = Objects.getDaemonThreadPool();
Connector c = new Connector(onConnected);
TimeUnit u = TimeUnit.SECONDS;
connectFuture = stpe.scheduleWithFixedDelay(c, 0, 5, u);
}
if (onConnected != null) {
onConnected.handle(client);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void childSubscribed(Node child) {
}
if (wasEmpty) {
if (id != null) {
LOGGER.info("Subscribing " + node.getDisplayName());
LOGGER.debug("Subscribing " + node.getDisplayName());
haystack.subscribe(id, node);
}
}
Expand All @@ -51,7 +51,7 @@ public void childUnsubscribed(Node child) {
}
if (empty) {
if (id != null) {
LOGGER.info("Unsubscribing " + node.getDisplayName());
LOGGER.debug("Unsubscribing " + node.getDisplayName());
haystack.unsubscribe(id);
}
}
Expand Down

0 comments on commit cadcb37

Please sign in to comment.