Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

Inner Process Clients and Session Meta API. #93

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
be4f3c0
IdGenerator is made Java6 execution environment compatible. On Java 7…
alex-vas Aug 24, 2016
a4b5248
ServerText example tidy ups meant to make it a bit easier to read the…
alex-vas Aug 24, 2016
bf21840
Added svn:ignores the the appropriate folders to excude .settings/ an…
alex-vas Aug 24, 2016
7a9d5c7
WampRouter now has the createInnerProccessClient public method added,…
alex-vas Aug 24, 2016
030c1b8
Added withMetaApi option to the Router. It would enable session notif…
alex-vas Aug 25, 2016
db77f4d
Missed out one function rename.
alex-vas Aug 25, 2016
e456e0d
Added disclose_caller support to the router. disclose_publisher and c…
alex-vas Aug 26, 2016
397a8f6
Added disclose_publisher functionality to the router. Both disclose_c…
alex-vas Aug 26, 2016
02e29f1
Appeared that I have missed few changes to checkin. Also, there is a …
alex-vas Aug 29, 2016
88e6a02
createInnerProcessClient() is renamed to createInProcessClient(), as …
alex-vas Aug 29, 2016
5a0868d
InProcessClinet now shares the eventLoop with the router. This would …
alex-vas Aug 29, 2016
99f002d
session_meta_api and caller_identification and publisher_identificati…
alex-vas Aug 29, 2016
445c781
Fixed an issue where WampServerBuilder would not pass the withDisclos…
alex-vas Sep 14, 2016
15511ae
Fixed an issue where the maven-jar-pugin were using too old, not API …
alex-vas Oct 6, 2016
dd0c27c
Updated to use netty 4.1.6.Final
alex-vas Apr 26, 2017
58d3210
Fixed an issue with NPE thrown for the certain way of using it.
alex-vas Jul 21, 2017
02cb827
Added parameter for the WAMP router to perform WebSockets level PING …
alex-vas Nov 24, 2017
c35ba08
ECC Server. Fixed the issue with PONG responce. It used to return PON…
alex-vas Nov 28, 2017
8debcf1
Added feature of specifying "keep alive" and its parameters for the c…
alex-vas Nov 29, 2017
e0c134a
Fixed regression introduced with the change for PONG which supposed t…
alex-vas Nov 29, 2017
501fcee
Improved disconnected condition detection. Now we are sending one las…
alex-vas Mar 18, 2018
653d49a
Java. Two finels modified as a result of migration to eclipse photon …
alex-vas Jul 3, 2018
4472130
svn:ignores updated
alex-vas Nov 7, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions jawampa-core/.classpath
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6">
Expand Down
2 changes: 1 addition & 1 deletion jawampa-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>2.6</version>
<version>3.0.2</version>
<configuration>
<archive>
<manifest>
Expand Down
186 changes: 170 additions & 16 deletions jawampa-core/src/main/java/ws/wamp/jawampa/WampRouter.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public class WampRouterBuilder {

Map<String, RealmConfig> realms = new HashMap<String, RealmConfig>();

boolean metaApiEnabled;

boolean discloseCaller;
boolean disclosePublisher;

public WampRouterBuilder() {

}
Expand All @@ -47,7 +52,7 @@ public WampRouter build() throws ApplicationError {
if (realms.size() == 0)
throw new ApplicationError(ApplicationError.INVALID_REALM);

return new WampRouter(realms);
return new WampRouter(realms, metaApiEnabled, discloseCaller, disclosePublisher);
}

/**
Expand Down Expand Up @@ -97,4 +102,23 @@ public WampRouterBuilder addRealm(String realmName, WampRoles[] roles, boolean u

return this;
}

public WampRouterBuilder withMetaApiEnabled()
{
metaApiEnabled = true;
return this;
}

public WampRouterBuilder withDiscloseCaller()
{
discloseCaller = true;
return this;
}

public WampRouterBuilder withDisclosePublisher()
{
disclosePublisher = true;
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,39 @@
package ws.wamp.jawampa.internal;

import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.Random;

/**
* Contains method for generating WAMP IDs
*/
public class IdGenerator {

private static Random oldRandom;

private static Class<?> threadLocalRandomClass;

private static Random getRandomGenerator()
{
if (oldRandom == null && threadLocalRandomClass == null) {
try {
threadLocalRandomClass = ClassLoader.getSystemClassLoader().loadClass("java.util.concurrent.ThreadLocalRandom"); // Java 7+
}
catch (ClassNotFoundException e) {
// fall back to an old, Java 6, low performance Random().
oldRandom = new Random();
}
}
if (threadLocalRandomClass != null) {
try {
return (Random) threadLocalRandomClass.getMethod("current").invoke(null);
}
catch (Exception e) {
throw new IllegalStateException(e);
}
}
return oldRandom;
}

/**
* Generates a new ID through a random generator.<br>
* If the new ID is not valid or is already in use as a key in the provided Map
Expand All @@ -34,7 +60,7 @@ public class IdGenerator {
*/
public static long newRandomId(Map<Long, ?> controlMap) {
for (;;) {
long l = ThreadLocalRandom.current().nextLong();
long l = getRandomGenerator().nextLong();
if (l < IdValidator.MIN_VALID_ID || l > IdValidator.MAX_VALID_ID) continue;
if (controlMap == null || !controlMap.containsKey(l)) return l;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,41 +50,48 @@ public static void main(String[] args) {

public void start() {

WampRouterBuilder routerBuilder = new WampRouterBuilder();
URI serverUri = URI.create("ws://0.0.0.0:8080/ws1");

// Build router and server
WampRouter router;
try {
routerBuilder.addRealm("realm1");
router = routerBuilder.build();
} catch (ApplicationError e1) {
e1.printStackTrace();
return;
SimpleWampWebsocketListener server;
{
WampRouterBuilder routerBuilder = new WampRouterBuilder();
try {
routerBuilder.addRealm("realm1");
router = routerBuilder.build();

server = new SimpleWampWebsocketListener(router, serverUri, null);
server.start();

} catch (ApplicationError e1) {
e1.printStackTrace();
return;
}
}

URI serverUri = URI.create("ws://0.0.0.0:8080/ws1");
SimpleWampWebsocketListener server;

IWampConnectorProvider connectorProvider = new NettyWampClientConnectorProvider();
WampClientBuilder builder = new WampClientBuilder();

// Build two clients
// Build socket client1
final WampClient client1;
final WampClient client2;
try {
server = new SimpleWampWebsocketListener(router, serverUri, null);
server.start();

builder.withConnectorProvider(connectorProvider)
.withUri("ws://localhost:8080/ws1")
.withRealm("realm1")
.withInfiniteReconnects()
.withReconnectInterval(3, TimeUnit.SECONDS);
client1 = builder.build();
client2 = builder.build();
} catch (Exception e) {
e.printStackTrace();
return;
{
IWampConnectorProvider connectorProvider = new NettyWampClientConnectorProvider();
WampClientBuilder builder = new WampClientBuilder();
try {
builder.withConnectorProvider(connectorProvider)
.withUri("ws://localhost:8080/ws1")
.withRealm("realm1")
.withInfiniteReconnects()
.withReconnectInterval(3, TimeUnit.SECONDS);
client1 = builder.build();
} catch (Exception e) {
e.printStackTrace();
return;
}
}

// Build inner process client2
final WampClient client2 = router.createInProcessClient("realm1");

// Setup client1
client1.statusChanged().subscribe(new Action1<WampClient.State>() {
@Override
public void call(WampClient.State t1) {
Expand Down Expand Up @@ -126,6 +133,7 @@ public void call() {
}
});

// Setup client2
client2.statusChanged().subscribe(new Action1<WampClient.State>() {
@Override
public void call(WampClient.State t1) {
Expand Down Expand Up @@ -197,6 +205,7 @@ public void call() {
}
});

// Open clients
client1.open();
client2.open();

Expand All @@ -209,10 +218,12 @@ public void call() {
}
}, eventInterval, eventInterval, TimeUnit.MILLISECONDS);

// Wait for a key press and then shutdown
waitUntilKeypressed();
System.out.println("Stopping subscription");
if (eventSubscription != null)
if (eventSubscription != null) {
eventSubscription.unsubscribe();
}

waitUntilKeypressed();
System.out.println("Stopping publication");
Expand Down
1 change: 1 addition & 0 deletions jawampa-netty/.classpath
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
<attributes>
<attribute name="optional" value="true"/>
<attribute name="maven.pomderived" value="true"/>
<attribute name="test" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6">
Expand Down
7 changes: 6 additions & 1 deletion jawampa-netty/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>4.0.24.Final</version>
<version>4.1.6.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.1.6.Final</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import ws.wamp.jawampa.WampSerialization;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
Expand All @@ -50,13 +51,17 @@
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;

/**
* Returns factory methods for the establishment of WAMP connections between
Expand Down Expand Up @@ -268,13 +273,56 @@ protected void initChannel(SocketChannel ch) {
uri.getHost(),
port));
}
p.addLast(
new HttpClientCodec(),
new HttpObjectAggregator(8192),
new WebSocketClientProtocolHandler(handshaker, false),
new WebSocketFrameAggregator(WampHandlerConfiguration.MAX_WEBSOCKET_FRAME_SIZE),
new WampClientWebsocketHandler(handshaker),
connectionHandler);

p.addLast(new HttpClientCodec());
p.addLast(new HttpObjectAggregator(8192));

boolean keepAlive = nettyConfig.getPingPeriodSeconds() > 0 && nettyConfig.getPingTimeoutSeconds() > 0;
if (keepAlive)
{
p.addLast("keep-alive-idle-state-handler", new IdleStateHandler(
nettyConfig.getPingPeriodSeconds() + nettyConfig.getPingTimeoutSeconds() /*reader timeout*/,
nettyConfig.getPingPeriodSeconds() /*writer timeout*/,
0));
}

p.addLast(new WebSocketClientProtocolHandler(handshaker, false));

if (keepAlive)
{
p.addLast("keep-alive-action-handler", new ChannelDuplexHandler() {
private boolean pingTried;
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
if (pingTried) {
pingTried = false;
}
super.read(ctx);
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush(new PingWebSocketFrame());
} else if (e.state() == IdleState.READER_IDLE) {
if (!pingTried) {
ctx.writeAndFlush(new PingWebSocketFrame());
pingTried = true;
} else {
ctx.close();
}
}
} else {
ctx.fireUserEventTriggered(evt);
}
}
});
}

p.addLast(new WebSocketFrameAggregator(WampHandlerConfiguration.MAX_WEBSOCKET_FRAME_SIZE));
p.addLast(new WampClientWebsocketHandler(handshaker));
p.addLast(connectionHandler);
}
});

Expand Down
Loading