Skip to content

Commit

Permalink
Merge pull request #8 from Sitrica/beta
Browse files Browse the repository at this point in the history
1.1.3
  • Loading branch information
TheLimeGlass authored Sep 23, 2020
2 parents 186f69d + 4503fef commit 9bbcaad
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 72 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ plugins {

jar.archiveName = project.name + '.jar'
// Add SNAPSHOT to make this publish as a beta.
version '1.1.2'
version '1.1.3'

sourceCompatibility = 1.8

Expand Down
38 changes: 26 additions & 12 deletions src/main/java/com/sitrica/japson/client/JapsonClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

public class JapsonClient extends Japson {

private static final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

protected long HEARTBEAT = 1000L, DELAY = 1000L; // in milliseconds.

protected final InetAddress address;
protected final int port;

private boolean check, valid = true;
private final Gson gson;

Expand Down Expand Up @@ -50,20 +53,23 @@ public JapsonClient(String host, int port, Gson gson) throws UnknownHostExceptio
}

public JapsonClient(InetAddress address, int port, Gson gson) {
super(address, port);
this.address = address;
this.port = port;
this.gson = gson;
HeartbeatPacket packet = new HeartbeatPacket(password, port);
}

public JapsonClient start() {
executor.scheduleAtFixedRate(() -> {
try {
Boolean success = sendPacket(address, port, packet, gson);
if (check && success)
Boolean success = sendPacket(new HeartbeatPacket(password, port));
if (check && success != null && success)
valid = true;
} catch (TimeoutException | InterruptedException | ExecutionException e) {
valid = false;
}
}, DELAY, HEARTBEAT, TimeUnit.MILLISECONDS);
if (debug)
logger.atInfo().log("Started Japson client bound to %s.", address.getHostAddress() + ":" + port);
logger.atInfo().log("Started Japson client bound to %s.", address.getHostAddress() + ":" + port);
return this;
}

@Override
Expand Down Expand Up @@ -118,6 +124,14 @@ public JapsonClient enableDebug() {
return this;
}

public InetAddress getAddress() {
return address;
}

public int getPort() {
return port;
}

public void shutdown() {
executor.shutdown();
}
Expand All @@ -126,14 +140,14 @@ public void kill() {
executor.shutdownNow();
}

public <T> T sendPacket(ReturnablePacket<T> japsonPacket) throws TimeoutException, InterruptedException, ExecutionException {
if (!valid)
public <T> T sendPacket(ReturnablePacket<T> packet) throws TimeoutException, InterruptedException, ExecutionException {
if (check && !valid && !(packet instanceof HeartbeatPacket))
throw new TimeoutException("No connection to the server. Cancelling sending packet.");
return sendPacket(address, port, japsonPacket, gson);
return super.sendPacket(address, port, packet, gson);
}

public void sendPacket(Packet japsonPacket) {
sendPacket(address, port, japsonPacket, gson);
public void sendPacket(Packet packet) throws InterruptedException, ExecutionException, TimeoutException {
super.sendPacket(address, port, packet, gson);
}

}
12 changes: 11 additions & 1 deletion src/main/java/com/sitrica/japson/server/Connections.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.google.common.cache.CacheBuilder;
Expand All @@ -22,6 +23,7 @@

public class Connections extends Handler {

private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
private final LoadingCache<InetSocketAddress, JapsonConnection> disconnected;
private final List<JapsonConnection> connections = new ArrayList<>();
private final Set<Listener> listeners = new HashSet<>();
Expand Down Expand Up @@ -54,7 +56,7 @@ public JapsonConnection load(InetSocketAddress address) throws Exception {
}
});
listeners.addAll(japson.getListeners());
Executors.newSingleThreadScheduledExecutor().schedule(() -> {
executor.schedule(() -> {
for (JapsonConnection connection : connections) {
if (System.currentTimeMillis() - connection.getLastUpdate() < japson.getTimeout())
continue;
Expand Down Expand Up @@ -120,6 +122,14 @@ public JsonObject handle(InetAddress address, int packetPort, JsonObject json) {
return returning;
}

public void shutdown() {
executor.shutdown();
}

public void kill() {
executor.shutdownNow();
}

public class JapsonConnection {

private long updated = System.currentTimeMillis();
Expand Down
39 changes: 32 additions & 7 deletions src/main/java/com/sitrica/japson/server/JapsonServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

public class JapsonServer extends Japson {

private static final ExecutorService executor = Executors.newCachedThreadPool();
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final ExecutorService executor = Executors.newCachedThreadPool();
protected final Set<Listener> listeners = new HashSet<>();
private final Set<Integer> ignored = new HashSet<>();
private final SocketHandler handler;

protected final InetAddress address;
protected final int port;

private long RECONNECT = 5, EXPIRY = 10; // EXPIRY in minutes, DISCONNECT is amount.
private final Connections connections;
Expand Down Expand Up @@ -54,14 +57,16 @@ public JapsonServer(String host, int port, Gson gson) throws UnknownHostExceptio
}

public JapsonServer(InetAddress address, int port, Gson gson) throws SocketException {
super(address, port);
this.address = address;
this.port = port;
this.gson = gson;
this.socket = new DatagramSocket(port);
this.socket = new DatagramSocket(port, address);
socket.setSoTimeout(TIMEOUT);
connections = new Connections(this);
handlers.add(connections);
executor.execute(new SocketHandler(PACKET_SIZE, this, socket));
if (debug)
logger.atInfo().log("Started Japson server bound to %s.", address.getHostAddress() + ":" + port);
handler = new SocketHandler(PACKET_SIZE, this, socket);
executor.execute(handler);
logger.atInfo().log("Started Japson server bound to %s.", address.getHostAddress() + ":" + port);
}

@Override
Expand Down Expand Up @@ -148,14 +153,34 @@ public FluentLogger getLogger() {
return logger;
}

public InetAddress getAddress() {
return address;
}

public int getPort() {
return port;
}

public long getTimeout() {
return TIMEOUT;
}

public void shutdown() {
connections.shutdown();
socket.disconnect();
socket.close();
handler.stop();
executor.shutdown();
}

public void kill() {
connections.kill();
socket.disconnect();
socket.close();
handler.stop();
executor.shutdownNow();
}

public Gson getGson() {
return gson;
}
Expand Down
28 changes: 11 additions & 17 deletions src/main/java/com/sitrica/japson/server/SocketHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteArrayDataOutput;
Expand All @@ -16,8 +14,6 @@

public class SocketHandler implements Runnable {

private static final ExecutorService executor = Executors.newSingleThreadExecutor();

private final DatagramSocket socket;
private final JapsonServer japson;
private final int packetSize;
Expand All @@ -30,19 +26,11 @@ public SocketHandler(int packetSize, JapsonServer japson, DatagramSocket socket)
this.socket = socket;
}

void shutdown() {
executor.shutdown();
running = false;
}

void kill() {
executor.shutdownNow();
running = false;
}

@Override
public void run() {
while (running) {
if (socket.isClosed())
break;
try {
byte[] buf = new byte[packetSize];
DatagramPacket packet = new DatagramPacket(buf, buf.length);
Expand All @@ -56,11 +44,11 @@ public void run() {
int id = input.readInt();
String data = input.readUTF();
if (data == null) {
japson.getLogger().atSevere().log("Recieved packet with id %s and the json was null.", id);
japson.getLogger().atSevere().log("Received packet with id %s and the json was null.", id);
return;
}
if (japson.isDebug() && !japson.getIgnoredPackets().contains(id))
japson.getLogger().atInfo().log("Recieved packet with id %s and data %s", id, data);
if (japson.isDebug() && (japson.getIgnoredPackets().isEmpty() || !japson.getIgnoredPackets().contains(id)))
japson.getLogger().atInfo().log("Received packet with id %s and data %s", id, data);
// Handle
JsonObject object = JsonParser.parseString(data).getAsJsonObject();
japson.getHandlers().stream()
Expand All @@ -75,6 +63,8 @@ public void run() {
out.writeUTF(json);
byte[] returnBuf = out.toByteArray();
try {
if (socket.isClosed())
return;
socket.send(new DatagramPacket(returnBuf, returnBuf.length, packet.getAddress(), packet.getPort()));
if (japson.isDebug())
japson.getLogger().atInfo().log("Returning data %s as packet id %s", json, id);
Expand All @@ -88,4 +78,8 @@ public void run() {
}
}

public void stop() {
running = false;
}

}
44 changes: 14 additions & 30 deletions src/main/java/com/sitrica/japson/shared/Japson.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,15 @@

public abstract class Japson {

protected static final FluentLogger logger = FluentLogger.forEnclosingClass();
protected final FluentLogger logger = FluentLogger.forEnclosingClass();
protected final Set<InetAddress> acceptable = new HashSet<>();
protected final Set<Handler> handlers = new HashSet<>();

protected final InetAddress address;
protected final int port;

protected int PACKET_SIZE = 1024; // UDP standard
protected int TIMEOUT = 2000; // milliseconds
protected String password;
protected boolean debug;

protected Japson(InetAddress address, int port) {
this.address = address;
this.port = port;
}

public Japson registerHandlers(Handler... handlers) {
Sets.newHashSet(handlers).stream()
.filter(handler -> !this.handlers.stream().anyMatch(existing -> existing.getID() == handler.getID()))
Expand Down Expand Up @@ -81,10 +73,6 @@ public FluentLogger getLogger() {
return logger;
}

public InetAddress getAddress() {
return address;
}

public boolean hasPassword() {
return password != null;
}
Expand All @@ -93,12 +81,8 @@ public boolean isDebug() {
return debug;
}

public int getPort() {
return port;
}

public <T> T sendPacket(InetAddress address, int port, ReturnablePacket<T> japsonPacket) throws TimeoutException, InterruptedException, ExecutionException {
return sendPacket(address, port, japsonPacket, new GsonBuilder()
public <T> T sendPacket(InetAddress address, int port, ReturnablePacket<T> packet) throws TimeoutException, InterruptedException, ExecutionException {
return sendPacket(address, port, packet, new GsonBuilder()
.enableComplexMapKeySerialization()
.serializeNulls()
.setLenient()
Expand All @@ -112,13 +96,14 @@ public <T> T sendPacket(InetAddress address, int port, ReturnablePacket<T> japso
out.writeInt(japsonPacket.getID());
out.writeUTF(gson.toJson(japsonPacket.toJson()));
byte[] buf = out.toByteArray();
DatagramPacket packet = new DatagramPacket(buf, buf.length, address, port);
socket.send(packet);
socket.setSoTimeout(TIMEOUT);
socket.send(new DatagramPacket(buf, buf.length, address, port));
// Reset the byte buffer
buf = new byte[PACKET_SIZE];
ByteArrayDataInput input = new ReceiverFuture(logger, this, socket)
.create(new DatagramPacket(buf, buf.length))
.get();
.get(TIMEOUT, TimeUnit.MILLISECONDS);
socket.close();
if (input == null) {
logger.atSevere().log("Packet with id %s returned null or an incorrect readable object for Japson", japsonPacket.getID());
return null;
Expand All @@ -140,34 +125,33 @@ public <T> T sendPacket(InetAddress address, int port, ReturnablePacket<T> japso
logger.atSevere().withCause(exception)
.atMostEvery(15, TimeUnit.SECONDS)
.log("IO error: " + exception.getMessage());
} catch (InterruptedException | ExecutionException exception) {
logger.atSevere().withCause(exception)
.atMostEvery(15, TimeUnit.SECONDS)
.log("Timeout: " + exception.getMessage());
} catch (InterruptedException | ExecutionException | TimeoutException exception) {
// Already handled seperate.
}
return null;
}).get(TIMEOUT, TimeUnit.MILLISECONDS);
}

public void sendPacket(InetAddress address, int port, Packet japsonPacket) {
public void sendPacket(InetAddress address, int port, Packet japsonPacket) throws InterruptedException, ExecutionException, TimeoutException {
sendPacket(address, port, japsonPacket, new GsonBuilder()
.enableComplexMapKeySerialization()
.serializeNulls()
.setLenient()
.create());
}

public void sendPacket(InetAddress address, int port, Packet japsonPacket, Gson gson) {
public void sendPacket(InetAddress address, int port, Packet japsonPacket, Gson gson) throws InterruptedException, ExecutionException, TimeoutException {
CompletableFuture.runAsync(() -> {
try (DatagramSocket socket = new DatagramSocket()) {
ByteArrayDataOutput out = ByteStreams.newDataOutput();
out.writeInt(japsonPacket.getID());
out.writeUTF(gson.toJson(japsonPacket.toJson()));
byte[] buf = out.toByteArray();
socket.setSoTimeout((int)(3000));
socket.setSoTimeout(TIMEOUT);
socket.send(new DatagramPacket(buf, buf.length, address, port));
if (debug)
logger.atInfo().log("Sent non-returnable packet with id %s", japsonPacket.getID());
socket.close();
} catch (SocketException socketException) {
logger.atSevere().withCause(socketException)
.atMostEvery(15, TimeUnit.SECONDS)
Expand All @@ -177,7 +161,7 @@ public void sendPacket(InetAddress address, int port, Packet japsonPacket, Gson
.atMostEvery(15, TimeUnit.SECONDS)
.log("IO error: " + exception.getMessage());
}
});
}).get(TIMEOUT, TimeUnit.MILLISECONDS);
}

}
Loading

0 comments on commit 9bbcaad

Please sign in to comment.