Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement progressive call results #559

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 50 additions & 22 deletions autobahn/src/main/java/io/crossbar/autobahn/wamp/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -328,17 +328,25 @@ private void onMessage(IMessage message) throws Exception {
"RESULT received for non-pending request ID %s", msg.request));
}

mCallRequests.remove(msg.request);
if (request.resultTypeRef != null) {
// FIXME: check args length > 1 and == 0, and kwargs != null
// we cannot currently POJO automap these cases!
request.onReply.complete(mSerializer.convertValue(
msg.args.get(0), request.resultTypeRef));
} else if (request.resultTypeClass != null) {
request.onReply.complete(mSerializer.convertValue(
msg.args.get(0), request.resultTypeClass));
if (msg.options.containsKey("progress") && (Boolean) msg.options.get("progress")) {
if (request.options.progressHandler == null) {
throw new ProtocolError("Caller not accepting progressive call result");
}

request.options.progressHandler.onProgress(new CallResult(msg.args, msg.kwargs));
} else {
request.onReply.complete(new CallResult(msg.args, msg.kwargs));
mCallRequests.remove(msg.request);
if (request.resultTypeRef != null) {
// FIXME: check args length > 1 and == 0, and kwargs != null
// we cannot currently POJO automap these cases!
request.onReply.complete(mSerializer.convertValue(
msg.args.get(0), request.resultTypeRef));
} else if (request.resultTypeClass != null) {
request.onReply.complete(mSerializer.convertValue(
msg.args.get(0), request.resultTypeClass));
} else {
request.onReply.complete(new CallResult(msg.args, msg.kwargs));
}
}
} else if (message instanceof Subscribed) {
Subscribed msg = (Subscribed) message;
Expand Down Expand Up @@ -452,10 +460,21 @@ private void onMessage(IMessage message) throws Exception {
long callerSessionID = getOrDefault(msg.details, "caller", -1L);
String callerAuthID = getOrDefault(msg.details, "caller_authid", null);
String callerAuthRole = getOrDefault(msg.details, "caller_authrole", null);

InvocationDetails details = new InvocationDetails(
registration, registration.procedure, callerSessionID, callerAuthID, callerAuthRole, this);

Boolean progress = getOrDefault(msg.details, "receive_progress", false);
InvocationDetails details;
if (progress) {
details = new InvocationDetails(
registration, registration.procedure, callerSessionID, callerAuthID, callerAuthRole, this,
(args, kwargs) -> {
HashMap<String, Object> options = new HashMap<>();
options.put("progress", true);
send(new Yield(msg.request, args, kwargs, options));
});
} else {
details = new InvocationDetails(
registration, registration.procedure, callerSessionID, callerAuthID, callerAuthRole, this, null);
}
runAsync(() -> {
Object result;
if (registration.endpoint instanceof Supplier) {
Expand Down Expand Up @@ -494,22 +513,22 @@ private void onMessage(IMessage message) throws Exception {
}

} else {
send(new Yield(msg.request, invocRes.results, invocRes.kwresults));
send(new Yield(msg.request, invocRes.results, invocRes.kwresults, null));
}
}, mExecutor);
} else if (result instanceof InvocationResult) {
InvocationResult res = (InvocationResult) result;
send(new Yield(msg.request, res.results, res.kwresults));
send(new Yield(msg.request, res.results, res.kwresults, null));
} else if (result instanceof List) {
send(new Yield(msg.request, (List) result, null));
send(new Yield(msg.request, (List) result, null, null));
} else if (result instanceof Map) {
send(new Yield(msg.request, null, (Map) result));
send(new Yield(msg.request, null, (Map) result, null));
} else if (result instanceof Void) {
send(new Yield(msg.request, null, null));
send(new Yield(msg.request, null, null, null));
} else {
List<Object> item = new ArrayList<>();
item.add(result);
send(new Yield(msg.request, item, null));
send(new Yield(msg.request, item, null, null));
}
}, mExecutor).whenCompleteAsync((aVoid, throwable) -> {
// FIXME: implement better errors
Expand Down Expand Up @@ -1082,9 +1101,10 @@ private <T> CompletableFuture<T> reallyCall(
resultTypeReference, resultTypeClass));

if (options == null) {
send(new Call(requestID, procedure, args, kwargs, 0));
send(new Call(requestID, procedure, args, kwargs, 0, false));
} else {
send(new Call(requestID, procedure, args, kwargs, options.timeout));
boolean receiveProgress = options.progressHandler != null;
send(new Call(requestID, procedure, args, kwargs, options.timeout, receiveProgress));
}
return future;
}
Expand Down Expand Up @@ -1286,7 +1306,15 @@ private CompletableFuture<SessionDetails> reallyJoin(
roles.put("publisher", new HashMap<>());
roles.put("subscriber", new HashMap<>());
roles.put("caller", new HashMap<>());
roles.put("callee", new HashMap<>());

Map<String, Object> calleeFeatures = new HashMap<>();
calleeFeatures.put("progressive_call_results", true);
calleeFeatures.put("call_canceling", true);

Map<String, Object> callee = new HashMap<>();
callee.put("features", calleeFeatures);
roles.put("callee", callee);

if (mAuthenticators == null) {
send(new Hello(realm, roles));
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.crossbar.autobahn.wamp.interfaces;

import java.util.List;
import java.util.Map;

public interface Progress {
void sendProgress(List<Object> args, Map<String, Object> kwargs);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.crossbar.autobahn.wamp.interfaces;

import io.crossbar.autobahn.wamp.types.CallResult;

public interface ProgressHandler {
void onProgress(CallResult result);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ public class Call implements IMessage {
public final List<Object> args;
public final Map<String, Object> kwargs;
public final int timeout;
public final boolean receiveProgress;

public Call(long request, String procedure, List<Object> args, Map<String, Object> kwargs, int timeout) {
public Call(long request, String procedure, List<Object> args, Map<String, Object> kwargs,
int timeout, boolean receiveProgress) {
this.request = request;
this.procedure = procedure;
this.args = args;
Expand All @@ -45,6 +47,7 @@ public Call(long request, String procedure, List<Object> args, Map<String, Objec
} else {
this.timeout = timeout;
}
this.receiveProgress = receiveProgress;
}

public static Call parse(List<Object> wmsg) {
Expand All @@ -69,7 +72,9 @@ public static Call parse(List<Object> wmsg) {

int timeout = getOrDefault(options, "timeout", TIMEOUT_DEFAULT);

return new Call(request, procedure, args, kwargs, timeout);
boolean receiveProgress = getOrDefault(options, "receive_progress", false);

return new Call(request, procedure, args, kwargs, timeout, receiveProgress);
}

@Override
Expand All @@ -81,6 +86,9 @@ public List<Object> marshal() {
if (timeout > TIMEOUT_DEFAULT) {
options.put("timeout", timeout);
}
if (receiveProgress) {
options.put("receive_progress", true);
}
marshaled.add(options);
marshaled.add(procedure);
if (kwargs != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,20 @@ public class Result implements IMessage {
public final long request;
public final List<Object> args;
public final Map<String, Object> kwargs;
public final Map<String, Object> options;

public Result(long request, List<Object> args, Map<String, Object> kwargs) {
public Result(long request, List<Object> args, Map<String, Object> kwargs, Map<String, Object> options) {
this.request = request;
this.args = args;
this.kwargs = kwargs;
this.options = options;
}

public static Result parse(List<Object> wmsg) {
MessageUtil.validateMessage(wmsg, MESSAGE_TYPE, "RESULT", 3, 5);

long request = MessageUtil.parseLong(wmsg.get(1));
Map<String, Object> options = (Map<String, Object>) wmsg.get(2);
List<Object> args = null;
if (wmsg.size() > 3) {
if (wmsg.get(3) instanceof byte[]) {
Expand All @@ -48,15 +51,19 @@ public static Result parse(List<Object> wmsg) {
if (wmsg.size() > 4) {
kwargs = (Map<String, Object>) wmsg.get(4);
}
return new Result(request, args, kwargs);
return new Result(request, args, kwargs, options);
}

@Override
public List<Object> marshal() {
List<Object> marshaled = new ArrayList<>();
marshaled.add(MESSAGE_TYPE);
marshaled.add(request);
marshaled.add(Collections.emptyMap());
if (options == null) {
marshaled.add(Collections.emptyMap());
} else {
marshaled.add(options);
}
if (kwargs != null) {
if (args == null) {
// Empty args.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@ public class Yield implements IMessage {
public final long request;
public final List<Object> args;
public final Map<String, Object> kwargs;
public final Map<String, Object> options;

public Yield(long request, List<Object> args, Map<String, Object> kwargs) {
public Yield(long request, List<Object> args, Map<String, Object> kwargs, Map<String, Object> options) {
this.request = request;
this.args = args;
this.kwargs = kwargs;
this.options = options;
}

public static Yield parse(List<Object> wmsg) {
Expand All @@ -50,16 +52,19 @@ public static Yield parse(List<Object> wmsg) {
if (wmsg.size() > 4) {
kwargs = (Map<String, Object>) wmsg.get(4);
}
return new Yield(MessageUtil.parseLong(wmsg.get(1)), args, kwargs);
return new Yield(MessageUtil.parseLong(wmsg.get(1)), args, kwargs, options);
}

@Override
public List<Object> marshal() {
List<Object> marshaled = new ArrayList<>();
marshaled.add(MESSAGE_TYPE);
marshaled.add(request);
// Empty options.
marshaled.add(Collections.emptyMap());
if (options == null) {
marshaled.add(Collections.emptyMap());
} else {
marshaled.add(options);
}
if (kwargs != null) {
if (args == null) {
// Empty args.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,22 @@

package io.crossbar.autobahn.wamp.types;

import io.crossbar.autobahn.wamp.interfaces.ProgressHandler;

public class CallOptions {
public final int timeout;
public int timeout;
public ProgressHandler progressHandler;

public CallOptions(int timeout) {
this.timeout = timeout;
}

public CallOptions(ProgressHandler progressHandler) {
this.progressHandler = progressHandler;
}

public CallOptions(int timeout, ProgressHandler progressHandler) {
this.timeout = timeout;
this.progressHandler = progressHandler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package io.crossbar.autobahn.wamp.types;

import io.crossbar.autobahn.wamp.Session;
import io.crossbar.autobahn.wamp.interfaces.Progress;

public class InvocationDetails {

Expand All @@ -33,18 +34,18 @@ public class InvocationDetails {
// The WAMP session on which this event is delivered.
public final Session session;

// FIXME
// we need a progress() callback here to allow
// the user to produce progressive results.
// callback produce progressive results.
public final Progress progress;

// XXXX - Tentative, the constructor parameter order may change.
public InvocationDetails(Registration registration, String procedure, long callerSessionID,
String callerAuthID, String callerAuthRole, Session session) {
String callerAuthID, String callerAuthRole, Session session, Progress progress) {
this.registration = registration;
this.procedure = procedure;
this.callerSessionID = callerSessionID;
this.callerAuthID = callerAuthID;
this.callerAuthRole = callerAuthRole;
this.session = session;
this.progress = progress;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.crossbar.autobahn.demogallery;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import io.crossbar.autobahn.wamp.Client;
import io.crossbar.autobahn.wamp.Session;
import io.crossbar.autobahn.wamp.types.CallOptions;
import io.crossbar.autobahn.wamp.types.CallResult;
import io.crossbar.autobahn.wamp.types.ExitInfo;
import io.crossbar.autobahn.wamp.types.InvocationDetails;
import io.crossbar.autobahn.wamp.types.InvocationResult;
import io.crossbar.autobahn.wamp.types.Registration;

public class ProgressiveCallResultsExample {

public static CompletableFuture<ExitInfo> registerProgressive(String wsAddress, String realm) {
Session wampSession = new Session();
wampSession.addOnJoinListener((session, details) -> {
CompletableFuture<Registration> regFuture = session.register(
"io.crossbar.longop",
(List<Object> args, Map<String, Object> kwargs, InvocationDetails invocationDetails) -> {
for (int i = 0; i < 5; i++) {
List<Object> argsList = new ArrayList<>();
argsList.add(i);
invocationDetails.progress.sendProgress(argsList, null);
}
List<Object> resultArgs = new ArrayList<>();
resultArgs.add(7);
return CompletableFuture.completedFuture(new InvocationResult(resultArgs));
});

regFuture.whenComplete((registration, throwable) -> {
System.out.println(String.format(
"Registered procedure %s", registration.procedure));
});
});

Client wampClient = new Client(wampSession, wsAddress, realm);
return wampClient.connect();
}


public static CompletableFuture<ExitInfo> callProgressive(String wsAddress, String realm) {
Session wampSession = new Session();
wampSession.addOnJoinListener((session, details) -> {
CompletableFuture<CallResult> callFuture = session.call(
"io.crossbar.longop",
new CallOptions(result -> System.out.println("Receive Progress: " + result.results)));

callFuture.whenComplete((callResult, throwable) -> {
System.out.println(String.format("Call result: %s", callResult.results));
});
});

Client wampClient = new Client(wampSession, wsAddress, realm);
return wampClient.connect();
}
}
Loading