diff --git a/build.gradle b/build.gradle index 0093ce0..a22db67 100644 --- a/build.gradle +++ b/build.gradle @@ -9,7 +9,7 @@ plugins { jar.archiveName = project.name + '.jar' // Add SNAPSHOT to make this publish as a beta. -version '1.1.2' +version '1.1.3' sourceCompatibility = 1.8 diff --git a/src/main/java/com/sitrica/japson/client/JapsonClient.java b/src/main/java/com/sitrica/japson/client/JapsonClient.java index c9c1f8f..daa64ba 100644 --- a/src/main/java/com/sitrica/japson/client/JapsonClient.java +++ b/src/main/java/com/sitrica/japson/client/JapsonClient.java @@ -18,10 +18,13 @@ public class JapsonClient extends Japson { - private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); protected long HEARTBEAT = 1000L, DELAY = 1000L; // in milliseconds. + protected final InetAddress address; + protected final int port; + private boolean check, valid = true; private final Gson gson; @@ -50,20 +53,23 @@ public JapsonClient(String host, int port, Gson gson) throws UnknownHostExceptio } public JapsonClient(InetAddress address, int port, Gson gson) { - super(address, port); + this.address = address; + this.port = port; this.gson = gson; - HeartbeatPacket packet = new HeartbeatPacket(password, port); + } + + public JapsonClient start() { executor.scheduleAtFixedRate(() -> { try { - Boolean success = sendPacket(address, port, packet, gson); - if (check && success) + Boolean success = sendPacket(new HeartbeatPacket(password, port)); + if (check && success != null && success) valid = true; } catch (TimeoutException | InterruptedException | ExecutionException e) { valid = false; } }, DELAY, HEARTBEAT, TimeUnit.MILLISECONDS); - if (debug) - logger.atInfo().log("Started Japson client bound to %s.", address.getHostAddress() + ":" + port); + logger.atInfo().log("Started Japson client bound to %s.", address.getHostAddress() + ":" + port); + return this; } @Override @@ -118,6 +124,14 @@ public JapsonClient enableDebug() { return this; } + public InetAddress getAddress() { + return address; + } + + public int getPort() { + return port; + } + public void shutdown() { executor.shutdown(); } @@ -126,14 +140,14 @@ public void kill() { executor.shutdownNow(); } - public T sendPacket(ReturnablePacket japsonPacket) throws TimeoutException, InterruptedException, ExecutionException { - if (!valid) + public T sendPacket(ReturnablePacket packet) throws TimeoutException, InterruptedException, ExecutionException { + if (check && !valid && !(packet instanceof HeartbeatPacket)) throw new TimeoutException("No connection to the server. Cancelling sending packet."); - return sendPacket(address, port, japsonPacket, gson); + return super.sendPacket(address, port, packet, gson); } - public void sendPacket(Packet japsonPacket) { - sendPacket(address, port, japsonPacket, gson); + public void sendPacket(Packet packet) throws InterruptedException, ExecutionException, TimeoutException { + super.sendPacket(address, port, packet, gson); } } diff --git a/src/main/java/com/sitrica/japson/server/Connections.java b/src/main/java/com/sitrica/japson/server/Connections.java index 0d79903..00c6889 100644 --- a/src/main/java/com/sitrica/japson/server/Connections.java +++ b/src/main/java/com/sitrica/japson/server/Connections.java @@ -8,6 +8,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.google.common.cache.CacheBuilder; @@ -22,6 +23,7 @@ public class Connections extends Handler { + private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); private final LoadingCache disconnected; private final List connections = new ArrayList<>(); private final Set listeners = new HashSet<>(); @@ -54,7 +56,7 @@ public JapsonConnection load(InetSocketAddress address) throws Exception { } }); listeners.addAll(japson.getListeners()); - Executors.newSingleThreadScheduledExecutor().schedule(() -> { + executor.schedule(() -> { for (JapsonConnection connection : connections) { if (System.currentTimeMillis() - connection.getLastUpdate() < japson.getTimeout()) continue; @@ -120,6 +122,14 @@ public JsonObject handle(InetAddress address, int packetPort, JsonObject json) { return returning; } + public void shutdown() { + executor.shutdown(); + } + + public void kill() { + executor.shutdownNow(); + } + public class JapsonConnection { private long updated = System.currentTimeMillis(); diff --git a/src/main/java/com/sitrica/japson/server/JapsonServer.java b/src/main/java/com/sitrica/japson/server/JapsonServer.java index 2ce527d..c081612 100644 --- a/src/main/java/com/sitrica/japson/server/JapsonServer.java +++ b/src/main/java/com/sitrica/japson/server/JapsonServer.java @@ -18,10 +18,13 @@ public class JapsonServer extends Japson { - private static final ExecutorService executor = Executors.newCachedThreadPool(); - private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + private final ExecutorService executor = Executors.newCachedThreadPool(); protected final Set listeners = new HashSet<>(); private final Set ignored = new HashSet<>(); + private final SocketHandler handler; + + protected final InetAddress address; + protected final int port; private long RECONNECT = 5, EXPIRY = 10; // EXPIRY in minutes, DISCONNECT is amount. private final Connections connections; @@ -54,14 +57,16 @@ public JapsonServer(String host, int port, Gson gson) throws UnknownHostExceptio } public JapsonServer(InetAddress address, int port, Gson gson) throws SocketException { - super(address, port); + this.address = address; + this.port = port; this.gson = gson; - this.socket = new DatagramSocket(port); + this.socket = new DatagramSocket(port, address); + socket.setSoTimeout(TIMEOUT); connections = new Connections(this); handlers.add(connections); - executor.execute(new SocketHandler(PACKET_SIZE, this, socket)); - if (debug) - logger.atInfo().log("Started Japson server bound to %s.", address.getHostAddress() + ":" + port); + handler = new SocketHandler(PACKET_SIZE, this, socket); + executor.execute(handler); + logger.atInfo().log("Started Japson server bound to %s.", address.getHostAddress() + ":" + port); } @Override @@ -148,14 +153,34 @@ public FluentLogger getLogger() { return logger; } + public InetAddress getAddress() { + return address; + } + + public int getPort() { + return port; + } + public long getTimeout() { return TIMEOUT; } public void shutdown() { + connections.shutdown(); + socket.disconnect(); + socket.close(); + handler.stop(); executor.shutdown(); } + public void kill() { + connections.kill(); + socket.disconnect(); + socket.close(); + handler.stop(); + executor.shutdownNow(); + } + public Gson getGson() { return gson; } diff --git a/src/main/java/com/sitrica/japson/server/SocketHandler.java b/src/main/java/com/sitrica/japson/server/SocketHandler.java index 47528f4..3a8e7bb 100644 --- a/src/main/java/com/sitrica/japson/server/SocketHandler.java +++ b/src/main/java/com/sitrica/japson/server/SocketHandler.java @@ -4,8 +4,6 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import com.google.common.io.ByteArrayDataInput; import com.google.common.io.ByteArrayDataOutput; @@ -16,8 +14,6 @@ public class SocketHandler implements Runnable { - private static final ExecutorService executor = Executors.newSingleThreadExecutor(); - private final DatagramSocket socket; private final JapsonServer japson; private final int packetSize; @@ -30,19 +26,11 @@ public SocketHandler(int packetSize, JapsonServer japson, DatagramSocket socket) this.socket = socket; } - void shutdown() { - executor.shutdown(); - running = false; - } - - void kill() { - executor.shutdownNow(); - running = false; - } - @Override public void run() { while (running) { + if (socket.isClosed()) + break; try { byte[] buf = new byte[packetSize]; DatagramPacket packet = new DatagramPacket(buf, buf.length); @@ -56,11 +44,11 @@ public void run() { int id = input.readInt(); String data = input.readUTF(); if (data == null) { - japson.getLogger().atSevere().log("Recieved packet with id %s and the json was null.", id); + japson.getLogger().atSevere().log("Received packet with id %s and the json was null.", id); return; } - if (japson.isDebug() && !japson.getIgnoredPackets().contains(id)) - japson.getLogger().atInfo().log("Recieved packet with id %s and data %s", id, data); + if (japson.isDebug() && (japson.getIgnoredPackets().isEmpty() || !japson.getIgnoredPackets().contains(id))) + japson.getLogger().atInfo().log("Received packet with id %s and data %s", id, data); // Handle JsonObject object = JsonParser.parseString(data).getAsJsonObject(); japson.getHandlers().stream() @@ -75,6 +63,8 @@ public void run() { out.writeUTF(json); byte[] returnBuf = out.toByteArray(); try { + if (socket.isClosed()) + return; socket.send(new DatagramPacket(returnBuf, returnBuf.length, packet.getAddress(), packet.getPort())); if (japson.isDebug()) japson.getLogger().atInfo().log("Returning data %s as packet id %s", json, id); @@ -88,4 +78,8 @@ public void run() { } } + public void stop() { + running = false; + } + } diff --git a/src/main/java/com/sitrica/japson/shared/Japson.java b/src/main/java/com/sitrica/japson/shared/Japson.java index 437431f..ea47ff2 100644 --- a/src/main/java/com/sitrica/japson/shared/Japson.java +++ b/src/main/java/com/sitrica/japson/shared/Japson.java @@ -23,23 +23,15 @@ public abstract class Japson { - protected static final FluentLogger logger = FluentLogger.forEnclosingClass(); + protected final FluentLogger logger = FluentLogger.forEnclosingClass(); protected final Set acceptable = new HashSet<>(); protected final Set handlers = new HashSet<>(); - protected final InetAddress address; - protected final int port; - protected int PACKET_SIZE = 1024; // UDP standard protected int TIMEOUT = 2000; // milliseconds protected String password; protected boolean debug; - protected Japson(InetAddress address, int port) { - this.address = address; - this.port = port; - } - public Japson registerHandlers(Handler... handlers) { Sets.newHashSet(handlers).stream() .filter(handler -> !this.handlers.stream().anyMatch(existing -> existing.getID() == handler.getID())) @@ -81,10 +73,6 @@ public FluentLogger getLogger() { return logger; } - public InetAddress getAddress() { - return address; - } - public boolean hasPassword() { return password != null; } @@ -93,12 +81,8 @@ public boolean isDebug() { return debug; } - public int getPort() { - return port; - } - - public T sendPacket(InetAddress address, int port, ReturnablePacket japsonPacket) throws TimeoutException, InterruptedException, ExecutionException { - return sendPacket(address, port, japsonPacket, new GsonBuilder() + public T sendPacket(InetAddress address, int port, ReturnablePacket packet) throws TimeoutException, InterruptedException, ExecutionException { + return sendPacket(address, port, packet, new GsonBuilder() .enableComplexMapKeySerialization() .serializeNulls() .setLenient() @@ -112,13 +96,14 @@ public T sendPacket(InetAddress address, int port, ReturnablePacket japso out.writeInt(japsonPacket.getID()); out.writeUTF(gson.toJson(japsonPacket.toJson())); byte[] buf = out.toByteArray(); - DatagramPacket packet = new DatagramPacket(buf, buf.length, address, port); - socket.send(packet); + socket.setSoTimeout(TIMEOUT); + socket.send(new DatagramPacket(buf, buf.length, address, port)); // Reset the byte buffer buf = new byte[PACKET_SIZE]; ByteArrayDataInput input = new ReceiverFuture(logger, this, socket) .create(new DatagramPacket(buf, buf.length)) - .get(); + .get(TIMEOUT, TimeUnit.MILLISECONDS); + socket.close(); if (input == null) { logger.atSevere().log("Packet with id %s returned null or an incorrect readable object for Japson", japsonPacket.getID()); return null; @@ -140,16 +125,14 @@ public T sendPacket(InetAddress address, int port, ReturnablePacket japso logger.atSevere().withCause(exception) .atMostEvery(15, TimeUnit.SECONDS) .log("IO error: " + exception.getMessage()); - } catch (InterruptedException | ExecutionException exception) { - logger.atSevere().withCause(exception) - .atMostEvery(15, TimeUnit.SECONDS) - .log("Timeout: " + exception.getMessage()); + } catch (InterruptedException | ExecutionException | TimeoutException exception) { + // Already handled seperate. } return null; }).get(TIMEOUT, TimeUnit.MILLISECONDS); } - public void sendPacket(InetAddress address, int port, Packet japsonPacket) { + public void sendPacket(InetAddress address, int port, Packet japsonPacket) throws InterruptedException, ExecutionException, TimeoutException { sendPacket(address, port, japsonPacket, new GsonBuilder() .enableComplexMapKeySerialization() .serializeNulls() @@ -157,17 +140,18 @@ public void sendPacket(InetAddress address, int port, Packet japsonPacket) { .create()); } - public void sendPacket(InetAddress address, int port, Packet japsonPacket, Gson gson) { + public void sendPacket(InetAddress address, int port, Packet japsonPacket, Gson gson) throws InterruptedException, ExecutionException, TimeoutException { CompletableFuture.runAsync(() -> { try (DatagramSocket socket = new DatagramSocket()) { ByteArrayDataOutput out = ByteStreams.newDataOutput(); out.writeInt(japsonPacket.getID()); out.writeUTF(gson.toJson(japsonPacket.toJson())); byte[] buf = out.toByteArray(); - socket.setSoTimeout((int)(3000)); + socket.setSoTimeout(TIMEOUT); socket.send(new DatagramPacket(buf, buf.length, address, port)); if (debug) logger.atInfo().log("Sent non-returnable packet with id %s", japsonPacket.getID()); + socket.close(); } catch (SocketException socketException) { logger.atSevere().withCause(socketException) .atMostEvery(15, TimeUnit.SECONDS) @@ -177,7 +161,7 @@ public void sendPacket(InetAddress address, int port, Packet japsonPacket, Gson .atMostEvery(15, TimeUnit.SECONDS) .log("IO error: " + exception.getMessage()); } - }); + }).get(TIMEOUT, TimeUnit.MILLISECONDS); } } diff --git a/src/main/java/com/sitrica/japson/shared/ReceiverFuture.java b/src/main/java/com/sitrica/japson/shared/ReceiverFuture.java index aadf35c..c40deb5 100644 --- a/src/main/java/com/sitrica/japson/shared/ReceiverFuture.java +++ b/src/main/java/com/sitrica/japson/shared/ReceiverFuture.java @@ -25,6 +25,8 @@ public ReceiverFuture(FluentLogger logger, Japson japson, DatagramSocket socket) public CompletableFuture create(DatagramPacket packet) { return CompletableFuture.supplyAsync(() -> { while (true) { + if (socket.isClosed()) + return null; try { socket.receive(packet); byte[] data = packet.getData(); diff --git a/src/test/java/com/sitrica/japson/ClientTest.java b/src/test/java/com/sitrica/japson/ClientTest.java index bd4a6d0..b40aaae 100644 --- a/src/test/java/com/sitrica/japson/ClientTest.java +++ b/src/test/java/com/sitrica/japson/ClientTest.java @@ -15,12 +15,12 @@ public static void setupClient() { japson = new JapsonClient(1337) .setPassword("test-password") .makeSureConnectionValid() - .enableDebug(); + .enableDebug() + .start(); } catch (UnknownHostException e) { e.printStackTrace(); } assertNotNull(japson); - System.out.println("Client setup on " + japson.getAddress().getHostAddress() + ":" + japson.getPort()); } } diff --git a/src/test/java/com/sitrica/japson/GeneralTest.java b/src/test/java/com/sitrica/japson/GeneralTest.java index 10cd566..491de73 100644 --- a/src/test/java/com/sitrica/japson/GeneralTest.java +++ b/src/test/java/com/sitrica/japson/GeneralTest.java @@ -34,6 +34,7 @@ public void startClient() { @Test @Order(3) public void sendPacket() { + assertEquals(ClientTest.japson.getAddress(), ServerTest.japson.getAddress()); String value = "testing Japson", value2 = "testing Japson 2"; String returned = null, second = null; try { @@ -77,7 +78,6 @@ public JsonObject toJson() { assertEquals(returned, value); assertNotNull(second); assertEquals(second, value2); - ClientTest.japson.getLogger().atInfo().log("All tests were successful!"); } } diff --git a/src/test/java/com/sitrica/japson/ServerTest.java b/src/test/java/com/sitrica/japson/ServerTest.java index 60a22ad..3bbeb36 100644 --- a/src/test/java/com/sitrica/japson/ServerTest.java +++ b/src/test/java/com/sitrica/japson/ServerTest.java @@ -60,7 +60,6 @@ public void execute(InetAddress address, int port, JsonObject object) { e.printStackTrace(); } assertNotNull(japson); - System.out.println("Server started on " + japson.getAddress().getHostAddress() + ":" + japson.getPort()); } }