diff --git a/jawampa-core/.classpath b/jawampa-core/.classpath index fd7ad7f..e51e38c 100644 --- a/jawampa-core/.classpath +++ b/jawampa-core/.classpath @@ -10,6 +10,7 @@ + diff --git a/jawampa-core/pom.xml b/jawampa-core/pom.xml index c0968a1..803b5f7 100644 --- a/jawampa-core/pom.xml +++ b/jawampa-core/pom.xml @@ -27,7 +27,7 @@ maven-jar-plugin - 2.6 + 3.0.2 diff --git a/jawampa-core/src/main/java/ws/wamp/jawampa/WampRouter.java b/jawampa-core/src/main/java/ws/wamp/jawampa/WampRouter.java index 55672c2..f9c0d35 100644 --- a/jawampa-core/src/main/java/ws/wamp/jawampa/WampRouter.java +++ b/jawampa-core/src/main/java/ws/wamp/jawampa/WampRouter.java @@ -16,6 +16,7 @@ package ws.wamp.jawampa; +import java.net.URI; import java.util.ArrayList; import java.util.EnumMap; import java.util.HashMap; @@ -37,11 +38,16 @@ import ws.wamp.jawampa.WampMessages.*; import ws.wamp.jawampa.connection.ICompletionCallback; import ws.wamp.jawampa.connection.IConnectionController; +import ws.wamp.jawampa.connection.IPendingWampConnection; +import ws.wamp.jawampa.connection.IPendingWampConnectionListener; +import ws.wamp.jawampa.connection.IWampClientConnectionConfig; import ws.wamp.jawampa.connection.IWampConnection; import ws.wamp.jawampa.connection.IWampConnectionAcceptor; import ws.wamp.jawampa.connection.IWampConnectionFuture; import ws.wamp.jawampa.connection.IWampConnectionListener; import ws.wamp.jawampa.connection.IWampConnectionPromise; +import ws.wamp.jawampa.connection.IWampConnector; +import ws.wamp.jawampa.connection.IWampConnectorProvider; import ws.wamp.jawampa.connection.QueueingConnectionController; import ws.wamp.jawampa.connection.WampConnectionPromise; import ws.wamp.jawampa.internal.IdGenerator; @@ -72,6 +78,9 @@ public class WampRouter { /** Represents a realm that is exposed through the router */ static class Realm { final RealmConfig config; + final WampClient metaApiClient; + final boolean discloseCallerEnabled; + final boolean disclosePublisherEnabled; final ObjectNode welcomeDetails; final Map channelsBySessionId = new HashMap(); final Map procedures = new HashMap(); @@ -82,8 +91,14 @@ static class Realm { final Map subscriptionsById = new HashMap(); long lastUsedSubscriptionId = IdValidator.MIN_VALID_ID; - public Realm(RealmConfig config) { + public Realm(RealmConfig config, WampClient metaApiClient, boolean discloseCallerEnabled, boolean disclosePublisherEnabled) { this.config = config; + this.metaApiClient = metaApiClient; + this.discloseCallerEnabled = discloseCallerEnabled; + this.disclosePublisherEnabled = disclosePublisherEnabled; + if (metaApiClient != null) { + metaApiClient.open(); + } subscriptionsByFlags.put(SubscriptionFlags.Exact, new HashMap()); subscriptionsByFlags.put(SubscriptionFlags.Prefix, new HashMap()); subscriptionsByFlags.put(SubscriptionFlags.Wildcard, new HashMap()); @@ -101,6 +116,12 @@ public Realm(RealmConfig config) { } else if (role == WampRoles.Subscriber) { ObjectNode featuresNode = roleNode.putObject("features"); featuresNode.put("pattern_based_subscription", true); + } else if (metaApiClient != null) { + roleNode.putObject("features").put("session_meta_api", true); + } else if (discloseCallerEnabled) { + roleNode.putObject("features").put("caller_identification", true); + } else if (disclosePublisherEnabled) { + roleNode.putObject("features").put("publisher_identification", true); } } } @@ -160,12 +181,14 @@ void removeChannel(ClientHandler channel, boolean removeFromList) { static class Procedure { final String procName; + final boolean discloseCaller; final ClientHandler provider; final long registrationId; final List pendingCalls = new ArrayList(); - public Procedure(String name, ClientHandler provider, long registrationId) { + public Procedure(String name, boolean discloseCaller, ClientHandler provider, long registrationId) { this.procName = name; + this.discloseCaller = discloseCaller; this.provider = provider; this.registrationId = registrationId; } @@ -181,13 +204,15 @@ static class Invocation { static class Subscription { final String topic; final SubscriptionFlags flags; + final boolean disclosePublisher; final String components[]; // non-null only for wildcard type final long subscriptionId; final Set subscribers; - public Subscription(String topic, SubscriptionFlags flags, long subscriptionId) { + public Subscription(String topic, SubscriptionFlags flags, boolean disclosePublisher, long subscriptionId) { this.topic = topic; this.flags = flags; + this.disclosePublisher = disclosePublisher; this.components = flags == SubscriptionFlags.Wildcard ? topic.split("\\.", -1) : null; this.subscriptionId = subscriptionId; this.subscribers = new HashSet(); @@ -225,15 +250,9 @@ public ObjectMapper objectMapper() { return objectMapper; } - WampRouter(Map realms) { - - // Populate the realms from the configuration - this.realms = new HashMap(); - for (Map.Entry e : realms.entrySet()) { - Realm info = new Realm(e.getValue()); - this.realms.put(e.getKey(), info); - } - + WampRouter(Map realms, boolean metaApiEnabled, + boolean discloseCallerEnabled, boolean disclosePublisherEnabled) + { // Create an eventloop and the RX scheduler on top of it this.eventLoop = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { @Override @@ -246,6 +265,14 @@ public Thread newThread(Runnable r) { this.scheduler = Schedulers.from(eventLoop); idleChannels = new HashSet(); + + // Populate the realms from the configuration + this.realms = new HashMap(); + for (Map.Entry e : realms.entrySet()) { + Realm info = new Realm(e.getValue(), metaApiEnabled ? createInProcessClient(e.getKey()) : null, + discloseCallerEnabled, disclosePublisherEnabled); + this.realms.put(e.getKey(), info); + } } /** @@ -312,6 +339,10 @@ public void run() { idleChannels.clear(); for (Realm ri : realms.values()) { + if (ri.metaApiClient != null) + { + ri.metaApiClient.close(); + } for (ClientHandler channel : ri.channelsBySessionId.values()) { ri.removeChannel(channel, false); channel.markAsClosed(); @@ -397,6 +428,111 @@ public IWampConnectionAcceptor connectionAcceptor() { return connectionAcceptor; } + /** + * Creates connection to the router which is wired directly to the router and bypassing any sockets. + * @param realm The name of the realm to which shall be connected. + * @return The created WAMP client + */ + public WampClient createInProcessClient(String realm) { + return createInProcessClient(realm, null, null); + } + + /** + * Creates connection to the router which is wired directly to the router and bypassing any sockets. + * @param realm The name of the realm to which shall be connected. + * @param clientRoles The set of roles that the client should fulfil in the session. + * At least one role is required, otherwise the session can not be established. + * @param objectMapper The {@link ObjectMapper} instance + * @return The created WAMP client + */ + public WampClient createInProcessClient(String realm, WampRoles[] clientRoles, ObjectMapper objectMapper) { + final IWampConnectionListener[] connectionListenerClient = { null }; + + final IWampConnectionListener connectionListenerRouter = connectionAcceptor().createNewConnectionListener(); + IWampConnection routerConnection = new IWampConnection() { + @Override + public WampSerialization serialization() { + throw new UnsupportedOperationException(); + } + @Override + public void sendMessage(WampMessage message, IWampConnectionPromise promise) { + connectionListenerClient[0].messageReceived(message); + promise.fulfill(null); + } + @Override + public boolean isSingleWriteOnly() { + return false; + } + @Override + public void close(boolean sendRemaining, IWampConnectionPromise promise) { + connectionListenerClient[0].transportClosed(); + promise.fulfill(null); + } + }; + connectionAcceptor().acceptNewConnection(routerConnection, connectionListenerRouter); + + IWampConnectorProvider connectorProvider = new IWampConnectorProvider() { + @Override + public ScheduledExecutorService createScheduler() { + return eventLoop; + } + @Override + public IWampConnector createConnector(URI uri, IWampClientConnectionConfig configuration, + List serializations) throws Exception + { + return new IWampConnector() { + @Override + public IPendingWampConnection connect(ScheduledExecutorService scheduler, + IPendingWampConnectionListener connectListener, + IWampConnectionListener connectionListener) + { + connectionListenerClient[0] = connectionListener; + connectListener.connectSucceeded(new IWampConnection() { + @Override + public WampSerialization serialization() { + throw new UnsupportedOperationException(); + } + @Override + public void sendMessage(WampMessage message, IWampConnectionPromise promise) { + connectionListenerRouter.messageReceived(message); + promise.fulfill(null); + } + @Override + public boolean isSingleWriteOnly() { + return false; + } + @Override + public void close(boolean sendRemaining, IWampConnectionPromise promise) { + connectionListenerRouter.transportClosed(); + promise.fulfill(null); + } + }); + return new IPendingWampConnection() { + @Override + public void cancelConnect() { + // nothing + } + }; + } + }; + } + }; + + WampClientBuilder builder = new WampClientBuilder().withConnectorProvider(connectorProvider) + .withUri("inner-process://example.com").withRealm(realm); + if (clientRoles != null) { + builder.withRoles(clientRoles); + } + if (objectMapper != null) { + builder.withObjectMapper(objectMapper); + } + try { + return builder.build(); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + class ClientHandler implements IWampConnectionListener { IConnectionController controller; @@ -462,6 +598,9 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m closeActiveClient(handler, new GoodbyeMessage(null, ApplicationError.INVALID_ARGUMENT)); } else if (msg instanceof AbortMessage || msg instanceof GoodbyeMessage) { // The client wants to leave the realm + if (handler.realm.metaApiClient != null) { + handler.realm.metaApiClient.publish("wamp.session.on_leave", objectMapper.createArrayNode().add(handler.sessionId), null); + } // Remove the channel from the realm handler.realm.removeChannel(handler, true); // But add it to the list of passive channels @@ -514,8 +653,9 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m proc.pendingCalls.add(invoc); // And send it to the provider + ObjectNode details = invoc.procedure.discloseCaller ? objectMapper.createObjectNode().put("caller", handler.sessionId) : null; InvocationMessage imsg = new InvocationMessage(invoc.invocationRequestId, - proc.registrationId, null, call.arguments, call.argumentsKw); + proc.registrationId, details, call.arguments, call.argumentsKw); proc.provider.controller.sendMessage(imsg, IWampConnectionPromise.Empty); } else if (msg instanceof YieldMessage) { // The clients sends as the result of an RPC @@ -589,7 +729,8 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m // Everything checked, we can register the caller as the procedure provider long registrationId = IdGenerator.newLinearId(handler.lastUsedId, handler.providedProcedures); handler.lastUsedId = registrationId; - Procedure procInfo = new Procedure(reg.procedure, handler, registrationId); + JsonNode discloseCallerNode = handler.realm.discloseCallerEnabled ? reg.options.get("disclose_caller") : null; + Procedure procInfo = new Procedure(reg.procedure, discloseCallerNode != null && discloseCallerNode.asBoolean(), handler, registrationId); // Insert new procedure handler.realm.procedures.put(reg.procedure, procInfo); @@ -719,7 +860,8 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m handler.realm.subscriptionsById); handler.realm.lastUsedSubscriptionId = subscriptionId; // Create and add the new subscription - subscription = new Subscription(sub.topic, flags, subscriptionId); + JsonNode disclosePublisherNode = handler.realm.disclosePublisherEnabled ? sub.options.get("disclose_publisher") : null; + subscription = new Subscription(sub.topic, flags, disclosePublisherNode != null && disclosePublisherNode.asBoolean(), subscriptionId); subscriptionMap.put(sub.topic, subscription); handler.realm.subscriptionsById.put(subscriptionId, subscription); } @@ -863,7 +1005,10 @@ private void publishEvent(ClientHandler publisher, PublishMessage pub, long publ details = objectMapper.createObjectNode(); details.put("topic", pub.topic); } - + if (publisher.realm.disclosePublisherEnabled && subscription.disclosePublisher) { + details.put("publisher", publisher.sessionId); + } + EventMessage ev = new EventMessage(subscription.subscriptionId, publicationId, details, pub.arguments, pub.argumentsKw); @@ -942,11 +1087,20 @@ private void onMessageFromUnregisteredChannel(ClientHandler channelHandler, Wamp // Respond with the WELCOME message WelcomeMessage welcome = new WelcomeMessage(channelHandler.sessionId, realm.welcomeDetails); channelHandler.controller.sendMessage(welcome, IWampConnectionPromise.Empty); + + if (realm.metaApiClient != null) { + // TODO Add authid, authrole, authmethod, authprovider, and optionally transport arguments as per the spec. + realm.metaApiClient.publish("wamp.session.on_join", objectMapper.createObjectNode().put("session", sessionId), null); + } } private void closeActiveClient(ClientHandler channel, WampMessage closeMessage) { if (channel == null) return; + if (channel.realm.metaApiClient != null) { + channel.realm.metaApiClient.publish("wamp.session.on_leave", objectMapper.createArrayNode().add(channel.sessionId), null); + } + channel.realm.removeChannel(channel, true); channel.markAsClosed(); diff --git a/jawampa-core/src/main/java/ws/wamp/jawampa/WampRouterBuilder.java b/jawampa-core/src/main/java/ws/wamp/jawampa/WampRouterBuilder.java index 05c7e11..8aabb8d 100644 --- a/jawampa-core/src/main/java/ws/wamp/jawampa/WampRouterBuilder.java +++ b/jawampa-core/src/main/java/ws/wamp/jawampa/WampRouterBuilder.java @@ -34,6 +34,11 @@ public class WampRouterBuilder { Map realms = new HashMap(); + boolean metaApiEnabled; + + boolean discloseCaller; + boolean disclosePublisher; + public WampRouterBuilder() { } @@ -47,7 +52,7 @@ public WampRouter build() throws ApplicationError { if (realms.size() == 0) throw new ApplicationError(ApplicationError.INVALID_REALM); - return new WampRouter(realms); + return new WampRouter(realms, metaApiEnabled, discloseCaller, disclosePublisher); } /** @@ -97,4 +102,23 @@ public WampRouterBuilder addRealm(String realmName, WampRoles[] roles, boolean u return this; } + + public WampRouterBuilder withMetaApiEnabled() + { + metaApiEnabled = true; + return this; + } + + public WampRouterBuilder withDiscloseCaller() + { + discloseCaller = true; + return this; + } + + public WampRouterBuilder withDisclosePublisher() + { + disclosePublisher = true; + return this; + } + } diff --git a/jawampa-core/src/main/java/ws/wamp/jawampa/internal/IdGenerator.java b/jawampa-core/src/main/java/ws/wamp/jawampa/internal/IdGenerator.java index 4645fe4..712d0b5 100644 --- a/jawampa-core/src/main/java/ws/wamp/jawampa/internal/IdGenerator.java +++ b/jawampa-core/src/main/java/ws/wamp/jawampa/internal/IdGenerator.java @@ -17,13 +17,39 @@ package ws.wamp.jawampa.internal; import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; +import java.util.Random; /** * Contains method for generating WAMP IDs */ public class IdGenerator { + private static Random oldRandom; + + private static Class threadLocalRandomClass; + + private static Random getRandomGenerator() + { + if (oldRandom == null && threadLocalRandomClass == null) { + try { + threadLocalRandomClass = ClassLoader.getSystemClassLoader().loadClass("java.util.concurrent.ThreadLocalRandom"); // Java 7+ + } + catch (ClassNotFoundException e) { + // fall back to an old, Java 6, low performance Random(). + oldRandom = new Random(); + } + } + if (threadLocalRandomClass != null) { + try { + return (Random) threadLocalRandomClass.getMethod("current").invoke(null); + } + catch (Exception e) { + throw new IllegalStateException(e); + } + } + return oldRandom; + } + /** * Generates a new ID through a random generator.
* If the new ID is not valid or is already in use as a key in the provided Map @@ -34,7 +60,7 @@ public class IdGenerator { */ public static long newRandomId(Map controlMap) { for (;;) { - long l = ThreadLocalRandom.current().nextLong(); + long l = getRandomGenerator().nextLong(); if (l < IdValidator.MIN_VALID_ID || l > IdValidator.MAX_VALID_ID) continue; if (controlMap == null || !controlMap.containsKey(l)) return l; } diff --git a/jawampa-examples/src/main/java/ws/wamp/jawampa/examples/ServerTest.java b/jawampa-examples/src/main/java/ws/wamp/jawampa/examples/ServerTest.java index e6101ce..42d4ffa 100644 --- a/jawampa-examples/src/main/java/ws/wamp/jawampa/examples/ServerTest.java +++ b/jawampa-examples/src/main/java/ws/wamp/jawampa/examples/ServerTest.java @@ -50,41 +50,48 @@ public static void main(String[] args) { public void start() { - WampRouterBuilder routerBuilder = new WampRouterBuilder(); + URI serverUri = URI.create("ws://0.0.0.0:8080/ws1"); + + // Build router and server WampRouter router; - try { - routerBuilder.addRealm("realm1"); - router = routerBuilder.build(); - } catch (ApplicationError e1) { - e1.printStackTrace(); - return; + SimpleWampWebsocketListener server; + { + WampRouterBuilder routerBuilder = new WampRouterBuilder(); + try { + routerBuilder.addRealm("realm1"); + router = routerBuilder.build(); + + server = new SimpleWampWebsocketListener(router, serverUri, null); + server.start(); + + } catch (ApplicationError e1) { + e1.printStackTrace(); + return; + } } - URI serverUri = URI.create("ws://0.0.0.0:8080/ws1"); - SimpleWampWebsocketListener server; - - IWampConnectorProvider connectorProvider = new NettyWampClientConnectorProvider(); - WampClientBuilder builder = new WampClientBuilder(); - - // Build two clients + // Build socket client1 final WampClient client1; - final WampClient client2; - try { - server = new SimpleWampWebsocketListener(router, serverUri, null); - server.start(); - - builder.withConnectorProvider(connectorProvider) - .withUri("ws://localhost:8080/ws1") - .withRealm("realm1") - .withInfiniteReconnects() - .withReconnectInterval(3, TimeUnit.SECONDS); - client1 = builder.build(); - client2 = builder.build(); - } catch (Exception e) { - e.printStackTrace(); - return; + { + IWampConnectorProvider connectorProvider = new NettyWampClientConnectorProvider(); + WampClientBuilder builder = new WampClientBuilder(); + try { + builder.withConnectorProvider(connectorProvider) + .withUri("ws://localhost:8080/ws1") + .withRealm("realm1") + .withInfiniteReconnects() + .withReconnectInterval(3, TimeUnit.SECONDS); + client1 = builder.build(); + } catch (Exception e) { + e.printStackTrace(); + return; + } } + // Build inner process client2 + final WampClient client2 = router.createInProcessClient("realm1"); + + // Setup client1 client1.statusChanged().subscribe(new Action1() { @Override public void call(WampClient.State t1) { @@ -126,6 +133,7 @@ public void call() { } }); + // Setup client2 client2.statusChanged().subscribe(new Action1() { @Override public void call(WampClient.State t1) { @@ -197,6 +205,7 @@ public void call() { } }); + // Open clients client1.open(); client2.open(); @@ -209,10 +218,12 @@ public void call() { } }, eventInterval, eventInterval, TimeUnit.MILLISECONDS); + // Wait for a key press and then shutdown waitUntilKeypressed(); System.out.println("Stopping subscription"); - if (eventSubscription != null) + if (eventSubscription != null) { eventSubscription.unsubscribe(); + } waitUntilKeypressed(); System.out.println("Stopping publication"); diff --git a/jawampa-netty/.classpath b/jawampa-netty/.classpath index fd7ad7f..e51e38c 100644 --- a/jawampa-netty/.classpath +++ b/jawampa-netty/.classpath @@ -10,6 +10,7 @@ +
diff --git a/jawampa-netty/pom.xml b/jawampa-netty/pom.xml index 794e158..b9ce98c 100644 --- a/jawampa-netty/pom.xml +++ b/jawampa-netty/pom.xml @@ -48,7 +48,12 @@ io.netty netty-codec-http - 4.0.24.Final + 4.1.6.Final + + + io.netty + netty-handler + 4.1.6.Final diff --git a/jawampa-netty/src/main/java/ws/wamp/jawampa/transport/netty/NettyWampClientConnectorProvider.java b/jawampa-netty/src/main/java/ws/wamp/jawampa/transport/netty/NettyWampClientConnectorProvider.java index 936a209..94972a4 100644 --- a/jawampa-netty/src/main/java/ws/wamp/jawampa/transport/netty/NettyWampClientConnectorProvider.java +++ b/jawampa-netty/src/main/java/ws/wamp/jawampa/transport/netty/NettyWampClientConnectorProvider.java @@ -36,6 +36,7 @@ import ws.wamp.jawampa.WampSerialization; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; @@ -50,6 +51,7 @@ import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler; @@ -57,6 +59,9 @@ import io.netty.handler.codec.http.websocketx.WebSocketVersion; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; /** * Returns factory methods for the establishment of WAMP connections between @@ -268,13 +273,56 @@ protected void initChannel(SocketChannel ch) { uri.getHost(), port)); } - p.addLast( - new HttpClientCodec(), - new HttpObjectAggregator(8192), - new WebSocketClientProtocolHandler(handshaker, false), - new WebSocketFrameAggregator(WampHandlerConfiguration.MAX_WEBSOCKET_FRAME_SIZE), - new WampClientWebsocketHandler(handshaker), - connectionHandler); + + p.addLast(new HttpClientCodec()); + p.addLast(new HttpObjectAggregator(8192)); + + boolean keepAlive = nettyConfig.getPingPeriodSeconds() > 0 && nettyConfig.getPingTimeoutSeconds() > 0; + if (keepAlive) + { + p.addLast("keep-alive-idle-state-handler", new IdleStateHandler( + nettyConfig.getPingPeriodSeconds() + nettyConfig.getPingTimeoutSeconds() /*reader timeout*/, + nettyConfig.getPingPeriodSeconds() /*writer timeout*/, + 0)); + } + + p.addLast(new WebSocketClientProtocolHandler(handshaker, false)); + + if (keepAlive) + { + p.addLast("keep-alive-action-handler", new ChannelDuplexHandler() { + private boolean pingTried; + @Override + public void read(ChannelHandlerContext ctx) throws Exception { + if (pingTried) { + pingTried = false; + } + super.read(ctx); + } + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent e = (IdleStateEvent) evt; + if (e.state() == IdleState.WRITER_IDLE) { + ctx.writeAndFlush(new PingWebSocketFrame()); + } else if (e.state() == IdleState.READER_IDLE) { + if (!pingTried) { + ctx.writeAndFlush(new PingWebSocketFrame()); + pingTried = true; + } else { + ctx.close(); + } + } + } else { + ctx.fireUserEventTriggered(evt); + } + } + }); + } + + p.addLast(new WebSocketFrameAggregator(WampHandlerConfiguration.MAX_WEBSOCKET_FRAME_SIZE)); + p.addLast(new WampClientWebsocketHandler(handshaker)); + p.addLast(connectionHandler); } }); diff --git a/jawampa-netty/src/main/java/ws/wamp/jawampa/transport/netty/NettyWampConnectionConfig.java b/jawampa-netty/src/main/java/ws/wamp/jawampa/transport/netty/NettyWampConnectionConfig.java index 027f63c..66d80dd 100644 --- a/jawampa-netty/src/main/java/ws/wamp/jawampa/transport/netty/NettyWampConnectionConfig.java +++ b/jawampa-netty/src/main/java/ws/wamp/jawampa/transport/netty/NettyWampConnectionConfig.java @@ -7,12 +7,17 @@ public class NettyWampConnectionConfig implements IWampClientConnectionConfig { static final int DEFAULT_MAX_FRAME_PAYLOAD_LENGTH = 65535; - SslContext sslContext; - int maxFramePayloadLength; + final SslContext sslContext; + final int maxFramePayloadLength; - NettyWampConnectionConfig(SslContext sslContext, int maxFramePayloadLength) { + final int pingPeriodSeconds; + final int pingTimeoutSeconds; + + NettyWampConnectionConfig(SslContext sslContext, int maxFramePayloadLength, int pingPeriodSeconds, int pingTimeoutSeconds) { this.sslContext = sslContext; this.maxFramePayloadLength = maxFramePayloadLength; + this.pingPeriodSeconds = pingPeriodSeconds; + this.pingTimeoutSeconds = pingTimeoutSeconds; } /** @@ -28,6 +33,14 @@ public int getMaxFramePayloadLength() { return maxFramePayloadLength; } + public int getPingPeriodSeconds() { + return pingPeriodSeconds; + } + + public int getPingTimeoutSeconds() { + return pingTimeoutSeconds; + } + /** * Builder class that must be used to create a {@link NettyWampConnectionConfig} * instance. @@ -36,7 +49,9 @@ public static class Builder { SslContext sslContext; int maxFramePayloadLength = DEFAULT_MAX_FRAME_PAYLOAD_LENGTH; - + int pingPeriodSeconds; + int pingTimeoutSeconds; + /** * Allows to set the SslContext which will be used to create Ssl connections to the WAMP * router. If this is set to null a default (unsecure) SSL client context will be created @@ -57,8 +72,20 @@ public Builder withMaxFramePayloadLength(int maxFramePayloadLength){ return this; } + public Builder withKeepAlive(int pingPeriodSeconds, int pingTimeoutSeconds) { + if (pingPeriodSeconds <= 0) { + throw new IllegalArgumentException("pingPeriodSeconds parameter cannot be negative"); + } + if (pingTimeoutSeconds <= 0) { + throw new IllegalArgumentException("pingTimeoutSeconds parameter cannot be negative"); + } + this.pingPeriodSeconds = pingPeriodSeconds; + this.pingTimeoutSeconds = pingTimeoutSeconds; + return this; + } + public NettyWampConnectionConfig build() { - return new NettyWampConnectionConfig(sslContext, maxFramePayloadLength); + return new NettyWampConnectionConfig(sslContext, maxFramePayloadLength, pingPeriodSeconds, pingTimeoutSeconds); } } } diff --git a/jawampa-netty/src/main/java/ws/wamp/jawampa/transport/netty/WampServerWebsocketHandler.java b/jawampa-netty/src/main/java/ws/wamp/jawampa/transport/netty/WampServerWebsocketHandler.java index f96c49d..4a5c236 100644 --- a/jawampa-netty/src/main/java/ws/wamp/jawampa/transport/netty/WampServerWebsocketHandler.java +++ b/jawampa-netty/src/main/java/ws/wamp/jawampa/transport/netty/WampServerWebsocketHandler.java @@ -17,7 +17,9 @@ package ws.wamp.jawampa.transport.netty; import ws.wamp.jawampa.WampRouter; +import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; @@ -37,8 +39,10 @@ import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.handler.ssl.SslHandler; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.ReferenceCountUtil; -import io.netty.util.internal.StringUtil; import ws.wamp.jawampa.WampSerialization; import ws.wamp.jawampa.WampMessages.WampMessage; import ws.wamp.jawampa.connection.IWampConnection; @@ -61,6 +65,9 @@ public class WampServerWebsocketHandler extends ChannelInboundHandlerAdapter { final IWampConnectionAcceptor connectionAcceptor; final List supportedSerializations; + final int pingPeriodSeconds; + final int pingTimeoutSeconds; + WampSerialization serialization = WampSerialization.Invalid; boolean handshakeInProgress = false; @@ -69,11 +76,18 @@ public WampServerWebsocketHandler(String websocketPath, WampRouter router) { } public WampServerWebsocketHandler(String websocketPath, WampRouter router, - List supportedSerializations) { + List supportedSerializations) { + this(websocketPath, router, supportedSerializations, 0, 0); + } + + public WampServerWebsocketHandler(String websocketPath, WampRouter router, + List supportedSerializations, int pingPeriodSeconds, int pingTimeoutSeconds) { this.websocketPath = websocketPath; this.router = router; this.connectionAcceptor = router.connectionAcceptor(); this.supportedSerializations = supportedSerializations; + this.pingPeriodSeconds = pingPeriodSeconds; + this.pingTimeoutSeconds = pingTimeoutSeconds; } @Override @@ -109,7 +123,7 @@ private boolean isUpgradeRequest(FullHttpRequest request) { if (connectionHeaderValue == null) { return false; } - String[] connectionHeaderFields = StringUtil.split(connectionHeaderValue.toLowerCase(), ','); + String[] connectionHeaderFields = connectionHeaderValue.toLowerCase().split(","); boolean hasUpgradeField = false; for (String s : connectionHeaderFields) { if (s.trim().equals(HttpHeaders.Values.UPGRADE.toLowerCase())) { @@ -224,6 +238,11 @@ private void tryWebsocketHandshake(final ChannelHandlerContext ctx, FullHttpRequ if (last == null) { throw new IllegalStateException("Can't find the WAMP server handler in the pipeline"); } + + boolean keepAlive = pingPeriodSeconds > 0 && pingTimeoutSeconds > 0; + if (keepAlive) { + ctx.pipeline().addLast("keep-alive-idle-state-handler", new IdleStateHandler(pingPeriodSeconds + pingTimeoutSeconds /*reader timeout*/, pingPeriodSeconds /*writer timeout*/, 0)); + } // Remove the WampServerWebSocketHandler and replace it with the protocol handler // which processes pings and closes @@ -231,6 +250,37 @@ private void tryWebsocketHandshake(final ChannelHandlerContext ctx, FullHttpRequ ctx.pipeline().replace(this, "wamp-websocket-protocol-handler", protocolHandler); final ChannelHandlerContext protocolHandlerCtx = ctx.pipeline().context(protocolHandler); + if (keepAlive) { + ctx.pipeline().addLast("keep-alive-action-handler", new ChannelDuplexHandler() { + private boolean pingTried; + @Override + public void read(ChannelHandlerContext ctx) throws Exception { + if (pingTried) { + pingTried = false; + } + super.read(ctx); + } + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent e = (IdleStateEvent) evt; + if (e.state() == IdleState.WRITER_IDLE) { + ctx.writeAndFlush(new PingWebSocketFrame()); + } else if (e.state() == IdleState.READER_IDLE) { + if (!pingTried) { + ctx.writeAndFlush(new PingWebSocketFrame()); + pingTried = true; + } else { + ctx.close(); + } + } + } else { + ctx.fireUserEventTriggered(evt); + } + } + }); + } + // Handle websocket fragmentation before the deserializer protocolHandlerCtx.pipeline().addLast(new WebSocketFrameAggregator(WampHandlerConfiguration.MAX_WEBSOCKET_FRAME_SIZE)); @@ -372,7 +422,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception if (msg instanceof PingWebSocketFrame) { // Respond to Pings with Pongs try { - ctx.writeAndFlush(new PongWebSocketFrame()); + ByteBuf content = ((PingWebSocketFrame) msg).content().copy(); + ctx.writeAndFlush(new PongWebSocketFrame(content)); } finally { ((PingWebSocketFrame) msg).release(); }