Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Adding CBOR and BSON serialization support and adding missing flags #112

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .mvn/wrapper/maven-wrapper.jar
Binary file not shown.
1 change: 1 addition & 0 deletions .mvn/wrapper/maven-wrapper.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.2/apache-maven-3.5.2-bin.zip
16 changes: 13 additions & 3 deletions jawampa-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,27 @@
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.0.8</version>
<version>1.0.17</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.4.4</version>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-cbor</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>jackson-dataformat-msgpack</artifactId>
<version>0.7.0-p7</version>
<version>0.8.13</version>
</dependency>
<dependency>
<groupId>de.undercouch</groupId>
<artifactId>bson4jackson</artifactId>
<version>2.7.0</version>
</dependency>
</dependencies>
</project>
23 changes: 19 additions & 4 deletions jawampa-core/src/main/java/ws/wamp/jawampa/ApplicationError.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

package ws.wamp.jawampa;

import ws.wamp.jawampa.connection.IWampConnectorProvider;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import ws.wamp.jawampa.connection.IWampConnectorProvider;

/**
* Base class for all WAMP related exceptions that can/may be
Expand All @@ -46,11 +45,11 @@ public ObjectNode keywordArguments() {
}

public ApplicationError(String uri) {
this(uri, null, null);
this(uri, (ArrayNode) null, (ObjectNode) null);
}

public ApplicationError(String uri, ArrayNode args) {
this(uri, args, null);
this(uri, args, (ObjectNode) null);
}

public ApplicationError(String uri, ArrayNode args, ObjectNode kwArgs) {
Expand All @@ -61,6 +60,22 @@ public ApplicationError(String uri, ArrayNode args, ObjectNode kwArgs) {
this.kwArgs = kwArgs;
}

public ApplicationError(String uri, Throwable cause) {
this(uri, null, null, cause);
}

public ApplicationError(String uri, ArrayNode args, Throwable cause) {
this(uri, args, null, cause);
}

public ApplicationError(String uri, ArrayNode args, ObjectNode kwArgs, Throwable cause) {
super(uri, cause);
if (uri == null) throw new NullPointerException();
this.uri = uri;
this.args = args;
this.kwArgs = kwArgs;
}

@Override
public String toString() {
StringBuilder s = new StringBuilder();
Expand Down
8 changes: 7 additions & 1 deletion jawampa-core/src/main/java/ws/wamp/jawampa/CallFlags.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,11 @@ public enum CallFlags {
* Set the disclose_me flag on the Call message to true.<br>
* This will allow the remote call endpoint to access information on this client's WAMP session.
*/
DiscloseMe
DiscloseMe,

MatchExact, //default, exact procedure URI match and not necessary to specify but will override if multiple match flags are present
MatchPrefix, // match for all precedure calls that have the given uri prefix
MatchWildcard,

ReceiveProgress // Should progress messages be sent if the call is long running
}
4 changes: 3 additions & 1 deletion jawampa-core/src/main/java/ws/wamp/jawampa/PublishFlags.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,7 @@ public enum PublishFlags {
* between client and router is established. If the flag is set the response from the router
* to a publish message will be used as a result for a publish request.
*/
RequireAcknowledge;
RequireAcknowledge,

DiscloseMe;
}
19 changes: 18 additions & 1 deletion jawampa-core/src/main/java/ws/wamp/jawampa/RegisterFlags.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,22 @@ public enum RegisterFlags {
/**
* the registered procedure is called with the caller's sessionID as part of the call details object.
*/
DiscloseCaller;
DiscloseCaller,

/**
* How are duplicate registrations of this procedure handled?
*/
InvokeSingle, // default, duplicate registrations cause errors and not necessary to specify but will override if multiple invoke flags are present
InvokeLast, // the last client to register is called when the precedure is executed
InvokeFirst, // the first client to register is called when the precedure is executed
InvokeRoundRobin, // all clients that are registered will be called in a round robin fashion
InvokeRandom, // all clients that are registered will be called in a random fashion

/**
* How is this procedure registration matched against procedure call uris
*/
MatchExact, //default, exact procedure URI match and not necessary to specify but will override if multiple match flags are present
MatchPrefix, // match for all precedure calls that have the given uri prefix
MatchWildcard // matching using .. wildcards
;
}
140 changes: 132 additions & 8 deletions jawampa-core/src/main/java/ws/wamp/jawampa/WampClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@

package ws.wamp.jawampa;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.net.URI;
import java.util.Arrays;
import java.util.EnumSet;
Expand All @@ -34,9 +39,6 @@
import ws.wamp.jawampa.internal.ArgArrayBuilder;
import ws.wamp.jawampa.internal.Promise;
import ws.wamp.jawampa.internal.UriValidator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

/**
* Provides the client-side functionality for WAMP.<br>
Expand Down Expand Up @@ -443,6 +445,61 @@ public T call(PubSubData ev) {
});
}

public <T> Observable<T> makeSubscription(final String topic, SubscriptionFlags flags, final JavaType eventType)
{
return makeSubscription(topic, flags).map(new Func1<PubSubData,T>() {
@Override
public T call(PubSubData ev) {
if (eventType == null) {
// We don't need a value
return null;
}

if (ev.arguments == null || ev.arguments.size() < 1)
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.MISSING_VALUE));

JsonNode eventNode = ev.arguments.get(0);
if (eventNode.isNull()) return null;

T eventValue;
try {
eventValue = clientConfig.objectMapper().convertValue(eventNode, eventType);
} catch (IllegalArgumentException e) {
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.INVALID_VALUE_TYPE, e));
}
return eventValue;
}
});
}

public <T> Observable<T> makeSubscription(final String topic, SubscriptionFlags flags, final TypeReference<?> typeReference)
{
return makeSubscription(topic, flags).map(new Func1<PubSubData,T>() {
@Override
public T call(PubSubData ev) {
if (typeReference == null) {
// We don't need a value
return null;
}

if (ev.arguments == null || ev.arguments.size() < 1)
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.MISSING_VALUE));

JsonNode eventNode = ev.arguments.get(0);
if (eventNode.isNull()) return null;

T eventValue;
try {
eventValue = clientConfig.objectMapper().convertValue(eventNode, typeReference);
} catch (IllegalArgumentException e) {
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.INVALID_VALUE_TYPE));
}
return eventValue;
}
});
}


/**
* Returns an observable that allows to subscribe on the given topic.<br>
* The actual subscription will only be made after subscribe() was called
Expand Down Expand Up @@ -474,27 +531,59 @@ public <T> Observable<EventDetails<T>> makeSubscriptionWithDetails(final String
return makeSubscription(topic, flags).map(new Func1<PubSubData,EventDetails<T>>() {
@Override
public EventDetails<T> call(PubSubData ev) {
//get the complete topic name
//which may not be the same as method parameter 'topic' during wildcard or prefix subscriptions
String actualTopic = null;
if(ev.details != null && ev.details.get("topic") != null){
actualTopic = ev.details.get("topic").asText();
}
if (eventClass == null || eventClass == Void.class) {
// We don't need a value
return null;
return new EventDetails<T>(null, actualTopic);
}


if (ev.arguments == null || ev.arguments.size() < 1)
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.MISSING_VALUE));

JsonNode eventNode = ev.arguments.get(0);
if (eventNode.isNull()) return new EventDetails<T>(null, actualTopic);

T eventValue;
try {
eventValue = clientConfig.objectMapper().convertValue(eventNode, eventClass);
} catch (IllegalArgumentException e) {
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.INVALID_VALUE_TYPE));
}
return new EventDetails<T>(eventValue, actualTopic);
}
});
}

public <T> Observable<EventDetails<T>> makeSubscriptionWithDetails(final String topic, SubscriptionFlags flags, final JavaType javaType)
{
return makeSubscription(topic, flags).map(new Func1<PubSubData,EventDetails<T>>() {
@Override
public EventDetails<T> call(PubSubData ev) {
//get the complete topic name
//which may not be the same as method parameter 'topic' during wildcard or prefix subscriptions
//which may not be the same as method parameter 'topic' during wildcard or prefix subscriptions
String actualTopic = null;
if(ev.details != null && ev.details.get("topic") != null){
actualTopic = ev.details.get("topic").asText();
}
if (javaType == null) {
// We don't need a value
return new EventDetails<T>(null, actualTopic);
}

if (ev.arguments == null || ev.arguments.size() < 1)
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.MISSING_VALUE));

JsonNode eventNode = ev.arguments.get(0);
if (eventNode.isNull()) return null;
if (eventNode.isNull()) return new EventDetails<T>(null, actualTopic);

T eventValue;
try {
eventValue = clientConfig.objectMapper().convertValue(eventNode, eventClass);
eventValue = clientConfig.objectMapper().convertValue(eventNode, javaType);
} catch (IllegalArgumentException e) {
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.INVALID_VALUE_TYPE));
}
Expand All @@ -503,6 +592,41 @@ public EventDetails<T> call(PubSubData ev) {
});
}

public <T> Observable<EventDetails<T>> makeSubscriptionWithDetails(final String topic, SubscriptionFlags flags, final TypeReference<?> typeRef)
{
return makeSubscription(topic, flags).map(new Func1<PubSubData,EventDetails<T>>() {
@Override
public EventDetails<T> call(PubSubData ev) {
//get the complete topic name
//which may not be the same as method parameter 'topic' during wildcard or prefix subscriptions
String actualTopic = null;
if(ev.details != null && ev.details.get("topic") != null){
actualTopic = ev.details.get("topic").asText();
}
if (typeRef == null) {
// We don't need a value
return new EventDetails<T>(null, actualTopic);
}

if (ev.arguments == null || ev.arguments.size() < 1)
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.MISSING_VALUE));

JsonNode eventNode = ev.arguments.get(0);
if (eventNode.isNull()) return new EventDetails<T>(null, actualTopic);

T eventValue;
try {
eventValue = clientConfig.objectMapper().convertValue(eventNode, typeRef);
} catch (IllegalArgumentException e) {
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.INVALID_VALUE_TYPE));
}
return new EventDetails<T>(eventValue, actualTopic);
}
});
}



/**
* Returns an observable that allows to subscribe on the given topic.<br>
* The actual subscription will only be made after subscribe() was called
Expand Down
4 changes: 4 additions & 0 deletions jawampa-core/src/main/java/ws/wamp/jawampa/WampError.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ public class WampError extends Exception {
public WampError(String message) {
super(message);
}

public WampError(String message, Throwable cause) {
super(message, cause);
}
}
20 changes: 15 additions & 5 deletions jawampa-core/src/main/java/ws/wamp/jawampa/WampSerialization.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package ws.wamp.jawampa;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.msgpack.jackson.dataformat.MessagePackFactory;

import com.fasterxml.jackson.dataformat.cbor.CBORFactory;
import de.undercouch.bson4jackson.BsonFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.msgpack.jackson.dataformat.MessagePackFactory;

/**
* Possible serialization methods for WAMP
Expand All @@ -32,6 +32,9 @@ public enum WampSerialization {
Invalid("", true, null),
/** Use the JSON serialization */
Json("wamp.2.json", true, new ObjectMapper()),
/** Use universal binary JSON (BSON) serialization */
Ubjson("wamp.2.ubjson", false, new ObjectMapper(new BsonFactory())),
Cbor("wamp.2.cbor", false, new ObjectMapper(new CBORFactory())),
/** Use the MessagePack serialization */
MessagePack("wamp.2.msgpack", false, new ObjectMapper(new MessagePackFactory()));

Expand Down Expand Up @@ -60,8 +63,13 @@ public String toString() {

public static WampSerialization fromString(String serialization) {
if (serialization == null) return Invalid;
else if (serialization.equals("wamp.2.json")) return Json;
else if (serialization.equals("wamp.2.msgpack")) return MessagePack;

serialization = serialization.toLowerCase();

if (serialization.contains("ubjson")) return Ubjson;
else if (serialization.contains("json")) return Json;
else if (serialization.contains("cbor")) return Cbor;
else if (serialization.contains("msgpack")) return MessagePack;
return Invalid;
}

Expand All @@ -79,6 +87,8 @@ public static String makeWebsocketSubprotocolList(List<WampSerialization> serial

public static void addDefaultSerializations(List<WampSerialization> serializations) {
serializations.add(Json);
serializations.add(Ubjson);
serializations.add(Cbor);
serializations.add(MessagePack);
}

Expand Down
Loading