Skip to content

Commit

Permalink
Merge PR Matthias247#112
Browse files Browse the repository at this point in the history
  • Loading branch information
Ramiro Aparicio committed Jul 16, 2018
2 parents 8a7d49c + 6793d7b commit ec0db6c
Show file tree
Hide file tree
Showing 15 changed files with 639 additions and 39 deletions.
Binary file added .mvn/wrapper/maven-wrapper.jar
Binary file not shown.
1 change: 1 addition & 0 deletions .mvn/wrapper/maven-wrapper.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
distributionUrl=https://repo1.maven.org/maven2/org/apache/maven/apache-maven/3.5.2/apache-maven-3.5.2-bin.zip
16 changes: 13 additions & 3 deletions jawampa-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,27 @@
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.0.8</version>
<version>1.0.17</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.4.4</version>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-cbor</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>jackson-dataformat-msgpack</artifactId>
<version>0.7.0-p7</version>
<version>0.8.13</version>
</dependency>
<dependency>
<groupId>de.undercouch</groupId>
<artifactId>bson4jackson</artifactId>
<version>2.7.0</version>
</dependency>
</dependencies>
</project>
23 changes: 19 additions & 4 deletions jawampa-core/src/main/java/ws/wamp/jawampa/ApplicationError.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

package ws.wamp.jawampa;

import ws.wamp.jawampa.connection.IWampConnectorProvider;

import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import ws.wamp.jawampa.connection.IWampConnectorProvider;

/**
* Base class for all WAMP related exceptions that can/may be
Expand All @@ -46,11 +45,11 @@ public ObjectNode keywordArguments() {
}

public ApplicationError(String uri) {
this(uri, null, null);
this(uri, (ArrayNode) null, (ObjectNode) null);
}

public ApplicationError(String uri, ArrayNode args) {
this(uri, args, null);
this(uri, args, (ObjectNode) null);
}

public ApplicationError(String uri, ArrayNode args, ObjectNode kwArgs) {
Expand All @@ -61,6 +60,22 @@ public ApplicationError(String uri, ArrayNode args, ObjectNode kwArgs) {
this.kwArgs = kwArgs;
}

public ApplicationError(String uri, Throwable cause) {
this(uri, null, null, cause);
}

public ApplicationError(String uri, ArrayNode args, Throwable cause) {
this(uri, args, null, cause);
}

public ApplicationError(String uri, ArrayNode args, ObjectNode kwArgs, Throwable cause) {
super(uri, cause);
if (uri == null) throw new NullPointerException();
this.uri = uri;
this.args = args;
this.kwArgs = kwArgs;
}

@Override
public String toString() {
StringBuilder s = new StringBuilder();
Expand Down
8 changes: 7 additions & 1 deletion jawampa-core/src/main/java/ws/wamp/jawampa/CallFlags.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,11 @@ public enum CallFlags {
* Set the disclose_me flag on the Call message to true.<br>
* This will allow the remote call endpoint to access information on this client's WAMP session.
*/
DiscloseMe
DiscloseMe,

MatchExact, //default, exact procedure URI match and not necessary to specify but will override if multiple match flags are present
MatchPrefix, // match for all precedure calls that have the given uri prefix
MatchWildcard,

ReceiveProgress // Should progress messages be sent if the call is long running
}
4 changes: 3 additions & 1 deletion jawampa-core/src/main/java/ws/wamp/jawampa/PublishFlags.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,7 @@ public enum PublishFlags {
* between client and router is established. If the flag is set the response from the router
* to a publish message will be used as a result for a publish request.
*/
RequireAcknowledge;
RequireAcknowledge,

DiscloseMe;
}
19 changes: 18 additions & 1 deletion jawampa-core/src/main/java/ws/wamp/jawampa/RegisterFlags.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,22 @@ public enum RegisterFlags {
/**
* the registered procedure is called with the caller's sessionID as part of the call details object.
*/
DiscloseCaller;
DiscloseCaller,

/**
* How are duplicate registrations of this procedure handled?
*/
InvokeSingle, // default, duplicate registrations cause errors and not necessary to specify but will override if multiple invoke flags are present
InvokeLast, // the last client to register is called when the precedure is executed
InvokeFirst, // the first client to register is called when the precedure is executed
InvokeRoundRobin, // all clients that are registered will be called in a round robin fashion
InvokeRandom, // all clients that are registered will be called in a random fashion

/**
* How is this procedure registration matched against procedure call uris
*/
MatchExact, //default, exact procedure URI match and not necessary to specify but will override if multiple match flags are present
MatchPrefix, // match for all precedure calls that have the given uri prefix
MatchWildcard // matching using .. wildcards
;
}
148 changes: 138 additions & 10 deletions jawampa-core/src/main/java/ws/wamp/jawampa/WampClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@

package ws.wamp.jawampa;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.net.URI;
import java.util.Arrays;
import java.util.EnumSet;
Expand All @@ -34,9 +39,6 @@
import ws.wamp.jawampa.internal.ArgArrayBuilder;
import ws.wamp.jawampa.internal.Promise;
import ws.wamp.jawampa.internal.UriValidator;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;

/**
* Provides the client-side functionality for WAMP.<br>
Expand Down Expand Up @@ -448,6 +450,61 @@ public T call(PubSubData ev) {
});
}

public <T> Observable<T> makeSubscription(final String topic, SubscriptionFlags flags, final JavaType eventType)
{
return makeSubscription(topic, flags).map(new Func1<PubSubData,T>() {
@Override
public T call(PubSubData ev) {
if (eventType == null) {
// We don't need a value
return null;
}

if (ev.arguments == null || ev.arguments.size() < 1)
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.MISSING_VALUE));

JsonNode eventNode = ev.arguments.get(0);
if (eventNode.isNull()) return null;

T eventValue;
try {
eventValue = clientConfig.objectMapper().convertValue(eventNode, eventType);
} catch (IllegalArgumentException e) {
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.INVALID_VALUE_TYPE, e));
}
return eventValue;
}
});
}

public <T> Observable<T> makeSubscription(final String topic, SubscriptionFlags flags, final TypeReference<?> typeReference)
{
return makeSubscription(topic, flags).map(new Func1<PubSubData,T>() {
@Override
public T call(PubSubData ev) {
if (typeReference == null) {
// We don't need a value
return null;
}

if (ev.arguments == null || ev.arguments.size() < 1)
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.MISSING_VALUE));

JsonNode eventNode = ev.arguments.get(0);
if (eventNode.isNull()) return null;

T eventValue;
try {
eventValue = clientConfig.objectMapper().convertValue(eventNode, typeReference);
} catch (IllegalArgumentException e) {
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.INVALID_VALUE_TYPE));
}
return eventValue;
}
});
}


/**
* Returns an observable that allows to subscribe on the given topic.<br>
* The actual subscription will only be made after subscribe() was called
Expand Down Expand Up @@ -479,17 +536,16 @@ public <T> Observable<EventDetails<T>> makeSubscriptionWithDetails(final String
return makeSubscription(topic, flags).map(new Func1<PubSubData,EventDetails<T>>() {
@Override
public EventDetails<T> call(PubSubData ev) {
if (eventClass == null || eventClass == Void.class) {
// We don't need a value
return null;
}

//get the complete topic name
//which may not be the same as method parameter 'topic' during wildcard or prefix subscriptions
//which may not be the same as method parameter 'topic' during wildcard or prefix subscriptions
String actualTopic = null;
if(ev.details != null && ev.details.get("topic") != null){
actualTopic = ev.details.get("topic").asText();
}
if (eventClass == null || eventClass == Void.class) {
// We don't need a value
return new EventDetails<T>(null, actualTopic);
}

if (ev.arguments == null || ev.arguments.size() < 1)
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.MISSING_VALUE));
Expand All @@ -500,7 +556,9 @@ public EventDetails<T> call(PubSubData ev) {
} else {
eventNode = ev.arguments;
}
if (eventNode.isNull()) return null;
if (eventNode.isNull()) {
return new EventDetails<T>(null, actualTopic);
}

T eventValue;
try {
Expand All @@ -513,6 +571,76 @@ public EventDetails<T> call(PubSubData ev) {
});
}

public <T> Observable<EventDetails<T>> makeSubscriptionWithDetails(final String topic, SubscriptionFlags flags, final JavaType javaType)
{
return makeSubscription(topic, flags).map(new Func1<PubSubData,EventDetails<T>>() {
@Override
public EventDetails<T> call(PubSubData ev) {
//get the complete topic name
//which may not be the same as method parameter 'topic' during wildcard or prefix subscriptions
String actualTopic = null;
if(ev.details != null && ev.details.get("topic") != null){
actualTopic = ev.details.get("topic").asText();
}
if (javaType == null) {
// We don't need a value
return new EventDetails<T>(null, actualTopic);
}

if (ev.arguments == null || ev.arguments.size() < 1)
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.MISSING_VALUE));

JsonNode eventNode = ev.arguments.get(0);
if (eventNode.isNull()) {
return null;
}

T eventValue;
try {
eventValue = clientConfig.objectMapper().convertValue(eventNode, javaType);
} catch (IllegalArgumentException e) {
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.INVALID_VALUE_TYPE));
}
return new EventDetails<T>(eventValue, actualTopic);
}
});
}

public <T> Observable<EventDetails<T>> makeSubscriptionWithDetails(final String topic, SubscriptionFlags flags, final TypeReference<?> typeRef)
{
return makeSubscription(topic, flags).map(new Func1<PubSubData,EventDetails<T>>() {
@Override
public EventDetails<T> call(PubSubData ev) {
//get the complete topic name
//which may not be the same as method parameter 'topic' during wildcard or prefix subscriptions
String actualTopic = null;
if(ev.details != null && ev.details.get("topic") != null){
actualTopic = ev.details.get("topic").asText();
}
if (typeRef == null) {
// We don't need a value
return new EventDetails<T>(null, actualTopic);
}

if (ev.arguments == null || ev.arguments.size() < 1)
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.MISSING_VALUE));

JsonNode eventNode = ev.arguments.get(0);
if (eventNode.isNull()) return new EventDetails<T>(null, actualTopic);

T eventValue;
try {
eventValue = clientConfig.objectMapper().convertValue(eventNode, typeRef);
} catch (IllegalArgumentException e) {
throw OnErrorThrowable.from(new ApplicationError(ApplicationError.INVALID_VALUE_TYPE));
}
return new EventDetails<T>(eventValue, actualTopic);
}
});
}



/**
* Returns an observable that allows to subscribe on the given topic.<br>
* The actual subscription will only be made after subscribe() was called
Expand Down
4 changes: 4 additions & 0 deletions jawampa-core/src/main/java/ws/wamp/jawampa/WampError.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ public class WampError extends Exception {
public WampError(String message) {
super(message);
}

public WampError(String message, Throwable cause) {
super(message, cause);
}
}
20 changes: 15 additions & 5 deletions jawampa-core/src/main/java/ws/wamp/jawampa/WampSerialization.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package ws.wamp.jawampa;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.msgpack.jackson.dataformat.MessagePackFactory;

import com.fasterxml.jackson.dataformat.cbor.CBORFactory;
import de.undercouch.bson4jackson.BsonFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.msgpack.jackson.dataformat.MessagePackFactory;

/**
* Possible serialization methods for WAMP
Expand All @@ -32,6 +32,9 @@ public enum WampSerialization {
Invalid("", true, null),
/** Use the JSON serialization */
Json("wamp.2.json", true, new ObjectMapper()),
/** Use universal binary JSON (BSON) serialization */
Ubjson("wamp.2.ubjson", false, new ObjectMapper(new BsonFactory())),
Cbor("wamp.2.cbor", false, new ObjectMapper(new CBORFactory())),
/** Use the MessagePack serialization */
MessagePack("wamp.2.msgpack", false, new ObjectMapper(new MessagePackFactory()));

Expand Down Expand Up @@ -60,8 +63,13 @@ public String toString() {

public static WampSerialization fromString(String serialization) {
if (serialization == null) return Invalid;
else if (serialization.equals("wamp.2.json")) return Json;
else if (serialization.equals("wamp.2.msgpack")) return MessagePack;

serialization = serialization.toLowerCase();

if (serialization.contains("ubjson")) return Ubjson;
else if (serialization.contains("json")) return Json;
else if (serialization.contains("cbor")) return Cbor;
else if (serialization.contains("msgpack")) return MessagePack;
return Invalid;
}

Expand All @@ -79,6 +87,8 @@ public static String makeWebsocketSubprotocolList(List<WampSerialization> serial

public static void addDefaultSerializations(List<WampSerialization> serializations) {
serializations.add(Json);
serializations.add(Ubjson);
serializations.add(Cbor);
serializations.add(MessagePack);
}

Expand Down
Loading

0 comments on commit ec0db6c

Please sign in to comment.