diff --git a/build.gradle b/build.gradle index 5266d6f..f51f738 100755 --- a/build.gradle +++ b/build.gradle @@ -4,7 +4,7 @@ apply plugin: 'java-library' mainClassName = 'org.dsa.iot.haystack.Main' sourceCompatibility = 1.7 targetCompatibility = 1.7 -version = '0.7.0' +version = '0.8.0' repositories { mavenLocal() diff --git a/dslink.json b/dslink.json index 2e98bfa..1bbe3ad 100644 --- a/dslink.json +++ b/dslink.json @@ -1,6 +1,6 @@ { "name": "dslink-java-haystack", - "version": "0.7.0", + "version": "0.8.0", "description": "An implementation dslink of a haystack protocol consumer", "license": "Apache", "author": { diff --git a/src/main/java/org/dsa/iot/haystack/Haystack.java b/src/main/java/org/dsa/iot/haystack/Haystack.java index 737de17..81f35db 100755 --- a/src/main/java/org/dsa/iot/haystack/Haystack.java +++ b/src/main/java/org/dsa/iot/haystack/Haystack.java @@ -1,9 +1,11 @@ package org.dsa.iot.haystack; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -42,12 +44,16 @@ public class Haystack { private static final Logger LOGGER = LoggerFactory.getLogger(Haystack.class); + private final ConnectionHelper conn; private final NavHelper navHelper; private final Node node; + private Set pendingSubscribe; + private Set pendingUnsubscribe; private ScheduledFuture pollFuture; private final ScheduledThreadPoolExecutor stpe; private final Map subs; + private boolean updating; private boolean watchEnabled; public Haystack(final Node node) { @@ -85,6 +91,10 @@ public Haystack(final Node node) { node.setConfig("read timeout", new Value(60)); } + if (node.getConfig("maxConnections") == null) { + node.setConfig("maxConnections", new Value(5)); + } + this.stpe = Objects.createDaemonThreadPool(); this.node = node; this.subs = new ConcurrentHashMap<>(); @@ -148,6 +158,7 @@ public void editConnection(String url, int pollRate, int connTimeout, int readTimeout, + int maxConnections, boolean enabled) { LOGGER.info("Edit Server url={} user={} enabled={}", url, user, enabled); node.setRoConfig("lu", new Value(0)); @@ -173,7 +184,7 @@ public void editConnection(String url, if (!enabled) { Utils.getStatusNode(node).setValue(new Value("Disabled")); } else { - conn.editConnection(url, user, pass, connTimeout, readTimeout); + conn.editConnection(url, user, pass, connTimeout, readTimeout, maxConnections); setupPoll(pollRate); } @@ -219,6 +230,24 @@ public ScheduledThreadPoolExecutor getStpe() { return stpe; } + public int getMaxConnections() { + Value v = node.getConfig("maxConnections"); + if (v != null) { + Number n = v.getNumber(); + if (n != null) { + return n.intValue(); + } + } + return 5; + } + + public void setMaxConnections(int max) { + if (max < 1) { + throw new IllegalArgumentException("Max connections must be > 1: " + max); + } + node.setConfig("maxConnections", new Value(max)); + } + public static void init(Node superRoot) { NodeBuilder builder = Utils.getBuilder(superRoot, "addServer"); builder.setDisplayName("Add Server"); @@ -335,21 +364,50 @@ public void stop() { conn.close(); } - public void subscribe(HRef id, Node node) { - subscribe(id, node, true); + public void subscribe(final HRef id, Node node) { + if (!isEnabled() || !watchEnabled) { + return; + } + synchronized (this) { + subs.put(id.toString(), node); + if (pendingSubscribe == null) { + pendingSubscribe = new HashSet<>(); + } + if (pendingUnsubscribe != null) { + pendingUnsubscribe.remove(id); + } + if (pendingSubscribe.add(id)) { + stpe.schedule(new Runnable() { + @Override + public void run() { + updateSubscriptions(); + } + }, 2, TimeUnit.SECONDS); + } + } } public void unsubscribe(final HRef id) { if (!isEnabled() || !watchEnabled) { return; } - subs.remove(id.toString()); - conn.getWatch(new StateHandler() { - @Override - public void handle(HWatch event) { - event.unsub(new HRef[]{id}); + synchronized (this) { + subs.remove(id.toString()); + if (pendingUnsubscribe == null) { + pendingUnsubscribe = new HashSet<>(); } - }); + if (pendingSubscribe != null) { + pendingSubscribe.remove(id); + } + if (pendingUnsubscribe.add(id)) { + stpe.schedule(new Runnable() { + @Override + public void run() { + updateSubscriptions(); + } + }, 2, TimeUnit.SECONDS); + } + } } void destroy() { @@ -425,19 +483,50 @@ public void run() { }, time, time, TimeUnit.SECONDS); } - private void subscribe(final HRef id, Node node, boolean add) { - if (!isEnabled() || !watchEnabled) { - return; - } - if (add) { - subs.put(id.toString(), node); + private void updateSubscriptions() { + Set toSubscribe; + Set toUnsubscribe; + synchronized (this) { + if (updating) { + return; + } + updating = true; + toSubscribe = pendingSubscribe; + toUnsubscribe = pendingUnsubscribe; + pendingSubscribe = null; + pendingUnsubscribe = null; } - - conn.getWatch(new StateHandler() { - @Override - public void handle(HWatch event) { - event.sub(new HRef[]{id}); + try { + while ((toSubscribe != null) || (toUnsubscribe != null)) { + if ((toSubscribe != null) && !toSubscribe.isEmpty()) { + final HRef[] ids = new HRef[toSubscribe.size()]; + toSubscribe.toArray(ids); + conn.getWatch(new StateHandler() { + @Override + public void handle(HWatch event) { + event.sub(ids); + } + }); + } + if ((toUnsubscribe != null) && !toUnsubscribe.isEmpty()) { + final HRef[] ids = new HRef[toUnsubscribe.size()]; + toUnsubscribe.toArray(ids); + conn.getWatch(new StateHandler() { + @Override + public void handle(HWatch event) { + event.unsub(ids); + } + }); + } + synchronized (this) { + toSubscribe = pendingSubscribe; + toUnsubscribe = pendingUnsubscribe; + pendingSubscribe = null; + pendingUnsubscribe = null; + } } - }); + } finally { + updating = false; + } } } diff --git a/src/main/java/org/dsa/iot/haystack/actions/ServerActions.java b/src/main/java/org/dsa/iot/haystack/actions/ServerActions.java index b9242f8..244aa24 100644 --- a/src/main/java/org/dsa/iot/haystack/actions/ServerActions.java +++ b/src/main/java/org/dsa/iot/haystack/actions/ServerActions.java @@ -107,6 +107,7 @@ public void handle(ActionResult event) { Value vPR = event.getParameter("Poll Rate", ValueType.NUMBER); Value vConnTimeout = event.getParameter("Connect Timeout"); Value vReadTimeout = event.getParameter("Read Timeout"); + Value vMaxConn = event.getParameter("Max Connections"); String url = vUrl.getString(); String user = vUser.getString(); @@ -128,9 +129,11 @@ public void handle(ActionResult event) { int pollRate = vPR.getNumber().intValue(); int connTimeout = (int) (vConnTimeout.getNumber().doubleValue() * 1000); int readTimeout = (int) (vReadTimeout.getNumber().doubleValue() * 1000); + int maxConn = vMaxConn.getNumber().intValue(); + haystack.setMaxConnections(maxConn); - haystack.editConnection( - url, user, pass, pollRate, connTimeout, readTimeout, vEnabled.getBool()); + haystack.editConnection(url, user, pass, pollRate, connTimeout, readTimeout, + maxConn, vEnabled.getBool()); } }); { @@ -170,6 +173,9 @@ public void handle(ActionResult event) { a.addParameter(new Parameter( "Read Timeout", ValueType.NUMBER, node.getConfig("read timeout")) .setDescription("Read timeout in seconds")); + a.addParameter(new Parameter( + "Max Connections", ValueType.NUMBER, node.getConfig("maxConnections")) + .setDescription("Max concurrent requests to server")); return a; } diff --git a/src/main/java/org/dsa/iot/haystack/handlers/ListHandler.java b/src/main/java/org/dsa/iot/haystack/handlers/ListHandler.java index bdae6c6..aad29b6 100644 --- a/src/main/java/org/dsa/iot/haystack/handlers/ListHandler.java +++ b/src/main/java/org/dsa/iot/haystack/handlers/ListHandler.java @@ -72,9 +72,9 @@ public void handle(final Node event) { public void run() { if (navId != null) { String path = event.getPath(); - LOGGER.info("Navigating: {} ({})", navId, path); + LOGGER.debug("Navigating: {} ({})", navId, path); } else { - LOGGER.info("Navigating root"); + LOGGER.debug("Navigating root"); } try { diff --git a/src/main/java/org/dsa/iot/haystack/helpers/ConnectionHelper.java b/src/main/java/org/dsa/iot/haystack/helpers/ConnectionHelper.java index 8954662..20cc846 100644 --- a/src/main/java/org/dsa/iot/haystack/helpers/ConnectionHelper.java +++ b/src/main/java/org/dsa/iot/haystack/helpers/ConnectionHelper.java @@ -6,6 +6,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.dsa.iot.dslink.node.Node; import org.dsa.iot.dslink.node.value.Value; @@ -42,6 +43,7 @@ public class ConnectionHelper { private volatile String url; private volatile int connectTimeout; private volatile int readTimeout; + private volatile Semaphore maxConnections; private final Node statusNode; private ScheduledFuture connectFuture; @@ -54,6 +56,7 @@ public ConnectionHelper(Haystack haystack, this.haystack = haystack; this.watchEnabled = watchEnabled; this.watchDisabled = watchDisabled; + this.maxConnections = new Semaphore(haystack.getMaxConnections(), true); Node node = haystack.getNode(); username = node.getConfig("username").getString(); @@ -65,7 +68,7 @@ public ConnectionHelper(Haystack haystack, } public void editConnection(String url, String user, String pass, int connTimeout, - int readTimeout) { + int readTimeout, int maxConnections) { close(); statusNode.setValue(new Value("Not Connected")); this.url = url; @@ -75,6 +78,7 @@ public void editConnection(String url, String user, String pass, int connTimeout } this.connectTimeout = connTimeout; this.readTimeout = readTimeout; + this.maxConnections = new Semaphore(maxConnections, true); getClient(null); } @@ -134,8 +138,12 @@ public void handle(HClient event) { } public void getClient(StateHandler onClientReceived) { + Semaphore semaphore = maxConnections; try { + semaphore.acquire(); connect(onClientReceived); + } catch (InterruptedException x) { + throw new RuntimeException(x); } catch (CallErrException cee) { if (onClientReceived != null && onClientReceived.incrementRetryCount() > 1) { throw cee; @@ -163,27 +171,31 @@ public void getClient(StateHandler onClientReceived) { if (rethrow) { throw x; } + } finally { + semaphore.release(); } } private void connect(Handler onConnected) { synchronized (lock) { if (connectFuture == null && client != null) { - if (onConnected != null) { - onConnected.handle(client); - } - return; + //passthru } else if (connectFuture != null) { if (onConnected != null) { queue.add(onConnected); } return; + } else { + close(); + ScheduledThreadPoolExecutor stpe = Objects.getDaemonThreadPool(); + Connector c = new Connector(onConnected); + TimeUnit u = TimeUnit.SECONDS; + connectFuture = stpe.scheduleWithFixedDelay(c, 0, 5, u); + return; } - close(); - ScheduledThreadPoolExecutor stpe = Objects.getDaemonThreadPool(); - Connector c = new Connector(onConnected); - TimeUnit u = TimeUnit.SECONDS; - connectFuture = stpe.scheduleWithFixedDelay(c, 0, 5, u); + } + if (onConnected != null) { + onConnected.handle(client); } } diff --git a/src/main/java/org/dsa/iot/haystack/helpers/SubscriptionController.java b/src/main/java/org/dsa/iot/haystack/helpers/SubscriptionController.java index 48b8a91..d432f60 100644 --- a/src/main/java/org/dsa/iot/haystack/helpers/SubscriptionController.java +++ b/src/main/java/org/dsa/iot/haystack/helpers/SubscriptionController.java @@ -35,7 +35,7 @@ public void childSubscribed(Node child) { } if (wasEmpty) { if (id != null) { - LOGGER.info("Subscribing " + node.getDisplayName()); + LOGGER.debug("Subscribing " + node.getDisplayName()); haystack.subscribe(id, node); } } @@ -51,7 +51,7 @@ public void childUnsubscribed(Node child) { } if (empty) { if (id != null) { - LOGGER.info("Unsubscribing " + node.getDisplayName()); + LOGGER.debug("Unsubscribing " + node.getDisplayName()); haystack.unsubscribe(id); } }