diff --git a/autobahn/src/main/java/io/crossbar/autobahn/wamp/Session.java b/autobahn/src/main/java/io/crossbar/autobahn/wamp/Session.java index 2c4f348f..21a1eff8 100644 --- a/autobahn/src/main/java/io/crossbar/autobahn/wamp/Session.java +++ b/autobahn/src/main/java/io/crossbar/autobahn/wamp/Session.java @@ -328,17 +328,25 @@ private void onMessage(IMessage message) throws Exception { "RESULT received for non-pending request ID %s", msg.request)); } - mCallRequests.remove(msg.request); - if (request.resultTypeRef != null) { - // FIXME: check args length > 1 and == 0, and kwargs != null - // we cannot currently POJO automap these cases! - request.onReply.complete(mSerializer.convertValue( - msg.args.get(0), request.resultTypeRef)); - } else if (request.resultTypeClass != null) { - request.onReply.complete(mSerializer.convertValue( - msg.args.get(0), request.resultTypeClass)); + if (msg.options.containsKey("progress") && (Boolean) msg.options.get("progress")) { + if (request.options.progressHandler == null) { + throw new ProtocolError("Caller not accepting progressive call result"); + } + + request.options.progressHandler.onProgress(new CallResult(msg.args, msg.kwargs)); } else { - request.onReply.complete(new CallResult(msg.args, msg.kwargs)); + mCallRequests.remove(msg.request); + if (request.resultTypeRef != null) { + // FIXME: check args length > 1 and == 0, and kwargs != null + // we cannot currently POJO automap these cases! + request.onReply.complete(mSerializer.convertValue( + msg.args.get(0), request.resultTypeRef)); + } else if (request.resultTypeClass != null) { + request.onReply.complete(mSerializer.convertValue( + msg.args.get(0), request.resultTypeClass)); + } else { + request.onReply.complete(new CallResult(msg.args, msg.kwargs)); + } } } else if (message instanceof Subscribed) { Subscribed msg = (Subscribed) message; @@ -452,10 +460,21 @@ private void onMessage(IMessage message) throws Exception { long callerSessionID = getOrDefault(msg.details, "caller", -1L); String callerAuthID = getOrDefault(msg.details, "caller_authid", null); String callerAuthRole = getOrDefault(msg.details, "caller_authrole", null); - - InvocationDetails details = new InvocationDetails( - registration, registration.procedure, callerSessionID, callerAuthID, callerAuthRole, this); + Boolean progress = getOrDefault(msg.details, "receive_progress", false); + InvocationDetails details; + if (progress) { + details = new InvocationDetails( + registration, registration.procedure, callerSessionID, callerAuthID, callerAuthRole, this, + (args, kwargs) -> { + HashMap options = new HashMap<>(); + options.put("progress", true); + send(new Yield(msg.request, args, kwargs, options)); + }); + } else { + details = new InvocationDetails( + registration, registration.procedure, callerSessionID, callerAuthID, callerAuthRole, this, null); + } runAsync(() -> { Object result; if (registration.endpoint instanceof Supplier) { @@ -494,22 +513,22 @@ private void onMessage(IMessage message) throws Exception { } } else { - send(new Yield(msg.request, invocRes.results, invocRes.kwresults)); + send(new Yield(msg.request, invocRes.results, invocRes.kwresults, null)); } }, mExecutor); } else if (result instanceof InvocationResult) { InvocationResult res = (InvocationResult) result; - send(new Yield(msg.request, res.results, res.kwresults)); + send(new Yield(msg.request, res.results, res.kwresults, null)); } else if (result instanceof List) { - send(new Yield(msg.request, (List) result, null)); + send(new Yield(msg.request, (List) result, null, null)); } else if (result instanceof Map) { - send(new Yield(msg.request, null, (Map) result)); + send(new Yield(msg.request, null, (Map) result, null)); } else if (result instanceof Void) { - send(new Yield(msg.request, null, null)); + send(new Yield(msg.request, null, null, null)); } else { List item = new ArrayList<>(); item.add(result); - send(new Yield(msg.request, item, null)); + send(new Yield(msg.request, item, null, null)); } }, mExecutor).whenCompleteAsync((aVoid, throwable) -> { // FIXME: implement better errors @@ -1082,9 +1101,10 @@ private CompletableFuture reallyCall( resultTypeReference, resultTypeClass)); if (options == null) { - send(new Call(requestID, procedure, args, kwargs, 0)); + send(new Call(requestID, procedure, args, kwargs, 0, false)); } else { - send(new Call(requestID, procedure, args, kwargs, options.timeout)); + boolean receiveProgress = options.progressHandler != null; + send(new Call(requestID, procedure, args, kwargs, options.timeout, receiveProgress)); } return future; } @@ -1286,7 +1306,15 @@ private CompletableFuture reallyJoin( roles.put("publisher", new HashMap<>()); roles.put("subscriber", new HashMap<>()); roles.put("caller", new HashMap<>()); - roles.put("callee", new HashMap<>()); + + Map calleeFeatures = new HashMap<>(); + calleeFeatures.put("progressive_call_results", true); + calleeFeatures.put("call_canceling", true); + + Map callee = new HashMap<>(); + callee.put("features", calleeFeatures); + roles.put("callee", callee); + if (mAuthenticators == null) { send(new Hello(realm, roles)); } else { diff --git a/autobahn/src/main/java/io/crossbar/autobahn/wamp/interfaces/Progress.java b/autobahn/src/main/java/io/crossbar/autobahn/wamp/interfaces/Progress.java new file mode 100644 index 00000000..6a366175 --- /dev/null +++ b/autobahn/src/main/java/io/crossbar/autobahn/wamp/interfaces/Progress.java @@ -0,0 +1,8 @@ +package io.crossbar.autobahn.wamp.interfaces; + +import java.util.List; +import java.util.Map; + +public interface Progress { + void sendProgress(List args, Map kwargs); +} diff --git a/autobahn/src/main/java/io/crossbar/autobahn/wamp/interfaces/ProgressHandler.java b/autobahn/src/main/java/io/crossbar/autobahn/wamp/interfaces/ProgressHandler.java new file mode 100644 index 00000000..215ee624 --- /dev/null +++ b/autobahn/src/main/java/io/crossbar/autobahn/wamp/interfaces/ProgressHandler.java @@ -0,0 +1,7 @@ +package io.crossbar.autobahn.wamp.interfaces; + +import io.crossbar.autobahn.wamp.types.CallResult; + +public interface ProgressHandler { + void onProgress(CallResult result); +} diff --git a/autobahn/src/main/java/io/crossbar/autobahn/wamp/messages/Call.java b/autobahn/src/main/java/io/crossbar/autobahn/wamp/messages/Call.java index c3ad21b1..235eca79 100644 --- a/autobahn/src/main/java/io/crossbar/autobahn/wamp/messages/Call.java +++ b/autobahn/src/main/java/io/crossbar/autobahn/wamp/messages/Call.java @@ -34,8 +34,10 @@ public class Call implements IMessage { public final List args; public final Map kwargs; public final int timeout; + public final boolean receiveProgress; - public Call(long request, String procedure, List args, Map kwargs, int timeout) { + public Call(long request, String procedure, List args, Map kwargs, + int timeout, boolean receiveProgress) { this.request = request; this.procedure = procedure; this.args = args; @@ -45,6 +47,7 @@ public Call(long request, String procedure, List args, Map wmsg) { @@ -69,7 +72,9 @@ public static Call parse(List wmsg) { int timeout = getOrDefault(options, "timeout", TIMEOUT_DEFAULT); - return new Call(request, procedure, args, kwargs, timeout); + boolean receiveProgress = getOrDefault(options, "receive_progress", false); + + return new Call(request, procedure, args, kwargs, timeout, receiveProgress); } @Override @@ -81,6 +86,9 @@ public List marshal() { if (timeout > TIMEOUT_DEFAULT) { options.put("timeout", timeout); } + if (receiveProgress) { + options.put("receive_progress", true); + } marshaled.add(options); marshaled.add(procedure); if (kwargs != null) { diff --git a/autobahn/src/main/java/io/crossbar/autobahn/wamp/messages/Result.java b/autobahn/src/main/java/io/crossbar/autobahn/wamp/messages/Result.java index 7281ff23..1fde178e 100644 --- a/autobahn/src/main/java/io/crossbar/autobahn/wamp/messages/Result.java +++ b/autobahn/src/main/java/io/crossbar/autobahn/wamp/messages/Result.java @@ -26,17 +26,20 @@ public class Result implements IMessage { public final long request; public final List args; public final Map kwargs; + public final Map options; - public Result(long request, List args, Map kwargs) { + public Result(long request, List args, Map kwargs, Map options) { this.request = request; this.args = args; this.kwargs = kwargs; + this.options = options; } public static Result parse(List wmsg) { MessageUtil.validateMessage(wmsg, MESSAGE_TYPE, "RESULT", 3, 5); long request = MessageUtil.parseLong(wmsg.get(1)); + Map options = (Map) wmsg.get(2); List args = null; if (wmsg.size() > 3) { if (wmsg.get(3) instanceof byte[]) { @@ -48,7 +51,7 @@ public static Result parse(List wmsg) { if (wmsg.size() > 4) { kwargs = (Map) wmsg.get(4); } - return new Result(request, args, kwargs); + return new Result(request, args, kwargs, options); } @Override @@ -56,7 +59,11 @@ public List marshal() { List marshaled = new ArrayList<>(); marshaled.add(MESSAGE_TYPE); marshaled.add(request); - marshaled.add(Collections.emptyMap()); + if (options == null) { + marshaled.add(Collections.emptyMap()); + } else { + marshaled.add(options); + } if (kwargs != null) { if (args == null) { // Empty args. diff --git a/autobahn/src/main/java/io/crossbar/autobahn/wamp/messages/Yield.java b/autobahn/src/main/java/io/crossbar/autobahn/wamp/messages/Yield.java index 69936a9b..9d133af5 100644 --- a/autobahn/src/main/java/io/crossbar/autobahn/wamp/messages/Yield.java +++ b/autobahn/src/main/java/io/crossbar/autobahn/wamp/messages/Yield.java @@ -28,11 +28,13 @@ public class Yield implements IMessage { public final long request; public final List args; public final Map kwargs; + public final Map options; - public Yield(long request, List args, Map kwargs) { + public Yield(long request, List args, Map kwargs, Map options) { this.request = request; this.args = args; this.kwargs = kwargs; + this.options = options; } public static Yield parse(List wmsg) { @@ -50,7 +52,7 @@ public static Yield parse(List wmsg) { if (wmsg.size() > 4) { kwargs = (Map) wmsg.get(4); } - return new Yield(MessageUtil.parseLong(wmsg.get(1)), args, kwargs); + return new Yield(MessageUtil.parseLong(wmsg.get(1)), args, kwargs, options); } @Override @@ -58,8 +60,11 @@ public List marshal() { List marshaled = new ArrayList<>(); marshaled.add(MESSAGE_TYPE); marshaled.add(request); - // Empty options. - marshaled.add(Collections.emptyMap()); + if (options == null) { + marshaled.add(Collections.emptyMap()); + } else { + marshaled.add(options); + } if (kwargs != null) { if (args == null) { // Empty args. diff --git a/autobahn/src/main/java/io/crossbar/autobahn/wamp/types/CallOptions.java b/autobahn/src/main/java/io/crossbar/autobahn/wamp/types/CallOptions.java index 704f56d4..9d983c28 100644 --- a/autobahn/src/main/java/io/crossbar/autobahn/wamp/types/CallOptions.java +++ b/autobahn/src/main/java/io/crossbar/autobahn/wamp/types/CallOptions.java @@ -11,10 +11,22 @@ package io.crossbar.autobahn.wamp.types; +import io.crossbar.autobahn.wamp.interfaces.ProgressHandler; + public class CallOptions { - public final int timeout; + public int timeout; + public ProgressHandler progressHandler; public CallOptions(int timeout) { this.timeout = timeout; } + + public CallOptions(ProgressHandler progressHandler) { + this.progressHandler = progressHandler; + } + + public CallOptions(int timeout, ProgressHandler progressHandler) { + this.timeout = timeout; + this.progressHandler = progressHandler; + } } diff --git a/autobahn/src/main/java/io/crossbar/autobahn/wamp/types/InvocationDetails.java b/autobahn/src/main/java/io/crossbar/autobahn/wamp/types/InvocationDetails.java index 11177bc3..7e385733 100644 --- a/autobahn/src/main/java/io/crossbar/autobahn/wamp/types/InvocationDetails.java +++ b/autobahn/src/main/java/io/crossbar/autobahn/wamp/types/InvocationDetails.java @@ -12,6 +12,7 @@ package io.crossbar.autobahn.wamp.types; import io.crossbar.autobahn.wamp.Session; +import io.crossbar.autobahn.wamp.interfaces.Progress; public class InvocationDetails { @@ -33,18 +34,18 @@ public class InvocationDetails { // The WAMP session on which this event is delivered. public final Session session; - // FIXME - // we need a progress() callback here to allow - // the user to produce progressive results. + // callback produce progressive results. + public final Progress progress; // XXXX - Tentative, the constructor parameter order may change. public InvocationDetails(Registration registration, String procedure, long callerSessionID, - String callerAuthID, String callerAuthRole, Session session) { + String callerAuthID, String callerAuthRole, Session session, Progress progress) { this.registration = registration; this.procedure = procedure; this.callerSessionID = callerSessionID; this.callerAuthID = callerAuthID; this.callerAuthRole = callerAuthRole; this.session = session; + this.progress = progress; } } diff --git a/demo-gallery/src/main/java/io/crossbar/autobahn/demogallery/ProgressiveCallResultsExample.java b/demo-gallery/src/main/java/io/crossbar/autobahn/demogallery/ProgressiveCallResultsExample.java new file mode 100644 index 00000000..746f2ece --- /dev/null +++ b/demo-gallery/src/main/java/io/crossbar/autobahn/demogallery/ProgressiveCallResultsExample.java @@ -0,0 +1,61 @@ +package io.crossbar.autobahn.demogallery; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import io.crossbar.autobahn.wamp.Client; +import io.crossbar.autobahn.wamp.Session; +import io.crossbar.autobahn.wamp.types.CallOptions; +import io.crossbar.autobahn.wamp.types.CallResult; +import io.crossbar.autobahn.wamp.types.ExitInfo; +import io.crossbar.autobahn.wamp.types.InvocationDetails; +import io.crossbar.autobahn.wamp.types.InvocationResult; +import io.crossbar.autobahn.wamp.types.Registration; + +public class ProgressiveCallResultsExample { + + public static CompletableFuture registerProgressive(String wsAddress, String realm) { + Session wampSession = new Session(); + wampSession.addOnJoinListener((session, details) -> { + CompletableFuture regFuture = session.register( + "io.crossbar.longop", + (List args, Map kwargs, InvocationDetails invocationDetails) -> { + for (int i = 0; i < 5; i++) { + List argsList = new ArrayList<>(); + argsList.add(i); + invocationDetails.progress.sendProgress(argsList, null); + } + List resultArgs = new ArrayList<>(); + resultArgs.add(7); + return CompletableFuture.completedFuture(new InvocationResult(resultArgs)); + }); + + regFuture.whenComplete((registration, throwable) -> { + System.out.println(String.format( + "Registered procedure %s", registration.procedure)); + }); + }); + + Client wampClient = new Client(wampSession, wsAddress, realm); + return wampClient.connect(); + } + + + public static CompletableFuture callProgressive(String wsAddress, String realm) { + Session wampSession = new Session(); + wampSession.addOnJoinListener((session, details) -> { + CompletableFuture callFuture = session.call( + "io.crossbar.longop", + new CallOptions(result -> System.out.println("Receive Progress: " + result.results))); + + callFuture.whenComplete((callResult, throwable) -> { + System.out.println(String.format("Call result: %s", callResult.results)); + }); + }); + + Client wampClient = new Client(wampSession, wsAddress, realm); + return wampClient.connect(); + } +}