From 1e933fb5b57158ca17bf18642c98309146f55795 Mon Sep 17 00:00:00 2001 From: Santhosh Kumar Tekuri Date: Fri, 3 Jul 2015 21:44:58 +0530 Subject: [PATCH] log errors encountered if wamp serializer/deserializer throws any error, then state remains in Connecting for-ever. no error in log. atleast logging will give some clue to users --- jawampa-core/pom.xml | 11 ++++ .../main/java/ws/wamp/jawampa/Request.java | 4 +- .../main/java/ws/wamp/jawampa/WampRouter.java | 56 ++++++++++--------- .../wamp/jawampa/client/ConnectingState.java | 2 +- .../wamp/jawampa/client/HandshakingState.java | 6 +- .../client/SessionEstablishedState.java | 16 +++--- .../connection/IWampConnectionPromise.java | 10 +++- .../netty/WampSerializationHandler.java | 1 + 8 files changed, 64 insertions(+), 42 deletions(-) diff --git a/jawampa-core/pom.xml b/jawampa-core/pom.xml index 272fb2b..2e85300 100644 --- a/jawampa-core/pom.xml +++ b/jawampa-core/pom.xml @@ -56,5 +56,16 @@ jackson-dataformat-msgpack 0.7.0-p7 + + org.slf4j + slf4j-api + 1.7.12 + + + org.slf4j + slf4j-jdk14 + 1.7.12 + true + diff --git a/jawampa-core/src/main/java/ws/wamp/jawampa/Request.java b/jawampa-core/src/main/java/ws/wamp/jawampa/Request.java index 6cb3dfe..bde42dc 100644 --- a/jawampa-core/src/main/java/ws/wamp/jawampa/Request.java +++ b/jawampa-core/src/main/java/ws/wamp/jawampa/Request.java @@ -126,7 +126,7 @@ public void replyError(String errorUri, ArrayNode arguments, ObjectNode keywordA @Override public void run() { if (stateController.currentState() != session) return; - session.connectionController().sendMessage(msg, IWampConnectionPromise.Empty); + session.connectionController().sendMessage(msg, IWampConnectionPromise.LogError); } }); } @@ -149,7 +149,7 @@ public void reply(ArrayNode arguments, ObjectNode keywordArguments) { @Override public void run() { if (stateController.currentState() != session) return; - session.connectionController().sendMessage(msg, IWampConnectionPromise.Empty); + session.connectionController().sendMessage(msg, IWampConnectionPromise.LogError); } }); } diff --git a/jawampa-core/src/main/java/ws/wamp/jawampa/WampRouter.java b/jawampa-core/src/main/java/ws/wamp/jawampa/WampRouter.java index 55672c2..11dfd6a 100644 --- a/jawampa-core/src/main/java/ws/wamp/jawampa/WampRouter.java +++ b/jawampa-core/src/main/java/ws/wamp/jawampa/WampRouter.java @@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rx.Observable; import rx.Scheduler; import rx.schedulers.Schedulers; @@ -59,6 +61,7 @@ * protocol.
*/ public class WampRouter { + private final static Logger logger = LoggerFactory.getLogger(WampRouter.class); final static Set SUPPORTED_CLIENT_ROLES; static { @@ -138,7 +141,7 @@ void removeChannel(ClientHandler channel, boolean removeFromList) { if (invoc.caller.state != RouterHandlerState.Open) continue; ErrorMessage errMsg = new ErrorMessage(CallMessage.ID, invoc.callRequestId, null, ApplicationError.NO_SUCH_PROCEDURE, null, null); - invoc.caller.controller.sendMessage(errMsg, IWampConnectionPromise.Empty); + invoc.caller.controller.sendMessage(errMsg, IWampConnectionPromise.LogError); } proc.pendingCalls.clear(); // Remove the procedure from the realm @@ -316,7 +319,7 @@ public void run() { ri.removeChannel(channel, false); channel.markAsClosed(); GoodbyeMessage goodbye = new GoodbyeMessage(null, ApplicationError.SYSTEM_SHUTDOWN); - channel.controller.sendMessage(goodbye, IWampConnectionPromise.Empty); + channel.controller.sendMessage(goodbye, IWampConnectionPromise.LogError); closeConnection(channel.controller, true); } ri.channelsBySessionId.clear(); @@ -356,7 +359,7 @@ public void run() { || newConnection == null) { // This is always true if the transport provider does not manipulate the structure // that was sent by the router - if (newConnection != null) newConnection.close(false, IWampConnectionPromise.Empty); + if (newConnection != null) newConnection.close(false, IWampConnectionPromise.LogError); return; } QueueingConnectionController controller = (QueueingConnectionController)connectionListener; @@ -379,7 +382,7 @@ public void run() { Runnable r = new Runnable () { @Override public void run() { - newConnection.close(false, IWampConnectionPromise.Empty); + newConnection.close(false, IWampConnectionPromise.LogError); } }; ExecutorService executor = Executors.newSingleThreadExecutor(); @@ -434,6 +437,9 @@ public void transportClosed() { @Override public void transportError(Throwable cause) { if (isDisposed || state != RouterHandlerState.Open) return; + if (cause!=null) { + logger.error("closing client because of transportError", cause); + } if (realm != null) { closeActiveClient(ClientHandler.this, null); } else { @@ -469,7 +475,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m // Echo the message in case of goodbye if (msg instanceof GoodbyeMessage) { GoodbyeMessage reply = new GoodbyeMessage(null, ApplicationError.GOODBYE_AND_OUT); - handler.controller.sendMessage(reply, IWampConnectionPromise.Empty); + handler.controller.sendMessage(reply, IWampConnectionPromise.LogError); } } else if (msg instanceof CallMessage) { // The client wants to call a remote function @@ -495,7 +501,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m if (err != null) { // If we have an error send that to the client ErrorMessage errMsg = new ErrorMessage(CallMessage.ID, call.requestId, null, err, null, null); - handler.controller.sendMessage(errMsg, IWampConnectionPromise.Empty); + handler.controller.sendMessage(errMsg, IWampConnectionPromise.LogError); return; } @@ -516,7 +522,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m // And send it to the provider InvocationMessage imsg = new InvocationMessage(invoc.invocationRequestId, proc.registrationId, null, call.arguments, call.argumentsKw); - proc.provider.controller.sendMessage(imsg, IWampConnectionPromise.Empty); + proc.provider.controller.sendMessage(imsg, IWampConnectionPromise.LogError); } else if (msg instanceof YieldMessage) { // The clients sends as the result of an RPC // Verify the message @@ -530,7 +536,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m invoc.procedure.pendingCalls.remove(invoc); // Send the result to the original caller ResultMessage result = new ResultMessage(invoc.callRequestId, null, yield.arguments, yield.argumentsKw); - invoc.caller.controller.sendMessage(result, IWampConnectionPromise.Empty); + invoc.caller.controller.sendMessage(result, IWampConnectionPromise.LogError); } else if (msg instanceof ErrorMessage) { ErrorMessage err = (ErrorMessage) msg; if (!(IdValidator.isValidId(err.requestId))) { @@ -555,7 +561,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m // Send the result to the original caller ErrorMessage fwdError = new ErrorMessage(CallMessage.ID, invoc.callRequestId, null, err.error, err.arguments, err.argumentsKw); - invoc.caller.controller.sendMessage(fwdError, IWampConnectionPromise.Empty); + invoc.caller.controller.sendMessage(fwdError, IWampConnectionPromise.LogError); } // else TODO: Are there any other possibilities where a client could return ERROR } else if (msg instanceof RegisterMessage) { @@ -582,7 +588,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m if (err != null) { // If we have an error send that to the client ErrorMessage errMsg = new ErrorMessage(RegisterMessage.ID, reg.requestId, null, err, null, null); - handler.controller.sendMessage(errMsg, IWampConnectionPromise.Empty); + handler.controller.sendMessage(errMsg, IWampConnectionPromise.LogError); return; } @@ -600,7 +606,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m handler.providedProcedures.put(procInfo.registrationId, procInfo); RegisteredMessage response = new RegisteredMessage(reg.requestId, procInfo.registrationId); - handler.controller.sendMessage(response, IWampConnectionPromise.Empty); + handler.controller.sendMessage(response, IWampConnectionPromise.LogError); } else if (msg instanceof UnregisterMessage) { // The client wants to unregister a procedure // Verify the message @@ -628,7 +634,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m if (err != null) { // If we have an error send that to the client ErrorMessage errMsg = new ErrorMessage(UnregisterMessage.ID, unreg.requestId, null, err, null, null); - handler.controller.sendMessage(errMsg, IWampConnectionPromise.Empty); + handler.controller.sendMessage(errMsg, IWampConnectionPromise.LogError); return; } @@ -638,7 +644,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m if (invoc.caller.state == RouterHandlerState.Open) { ErrorMessage errMsg = new ErrorMessage(CallMessage.ID, invoc.callRequestId, null, ApplicationError.NO_SUCH_PROCEDURE, null, null); - invoc.caller.controller.sendMessage(errMsg, IWampConnectionPromise.Empty); + invoc.caller.controller.sendMessage(errMsg, IWampConnectionPromise.LogError); } } proc.pendingCalls.clear(); @@ -654,7 +660,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m // Send the acknowledge UnregisteredMessage response = new UnregisteredMessage(unreg.requestId); - handler.controller.sendMessage(response, IWampConnectionPromise.Empty); + handler.controller.sendMessage(response, IWampConnectionPromise.LogError); } else if (msg instanceof SubscribeMessage) { // The client wants to subscribe to a procedure // Verify the message @@ -700,7 +706,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m if (err != null) { // If we have an error send that to the client ErrorMessage errMsg = new ErrorMessage(SubscribeMessage.ID, sub.requestId, null, err, null, null); - handler.controller.sendMessage(errMsg, IWampConnectionPromise.Empty); + handler.controller.sendMessage(errMsg, IWampConnectionPromise.LogError); return; } @@ -737,7 +743,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m } SubscribedMessage response = new SubscribedMessage(sub.requestId, subscription.subscriptionId); - handler.controller.sendMessage(response, IWampConnectionPromise.Empty); + handler.controller.sendMessage(response, IWampConnectionPromise.LogError); } else if (msg instanceof UnsubscribeMessage) { // The client wants to cancel a subscription // Verify the message @@ -763,7 +769,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m if (err != null) { // If we have an error send that to the client ErrorMessage errMsg = new ErrorMessage(UnsubscribeMessage.ID, unsub.requestId, null, err, null, null); - handler.controller.sendMessage(errMsg, IWampConnectionPromise.Empty); + handler.controller.sendMessage(errMsg, IWampConnectionPromise.LogError); return; } @@ -784,7 +790,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m // Send the acknowledge UnsubscribedMessage response = new UnsubscribedMessage(unsub.requestId); - handler.controller.sendMessage(response, IWampConnectionPromise.Empty); + handler.controller.sendMessage(response, IWampConnectionPromise.LogError); } else if (msg instanceof PublishMessage) { // The client wants to publish something to all subscribers (apart from himself) PublishMessage pub = (PublishMessage) msg; @@ -810,7 +816,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m ErrorMessage errMsg = new ErrorMessage(PublishMessage.ID, pub.requestId, null, err, null, null); if (sendAcknowledge) { - handler.controller.sendMessage(errMsg, IWampConnectionPromise.Empty); + handler.controller.sendMessage(errMsg, IWampConnectionPromise.LogError); } return; } @@ -852,7 +858,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m if (sendAcknowledge) { PublishedMessage response = new PublishedMessage(pub.requestId, publicationId); - handler.controller.sendMessage(response, IWampConnectionPromise.Empty); + handler.controller.sendMessage(response, IWampConnectionPromise.LogError); } } } @@ -880,7 +886,7 @@ private void publishEvent(ClientHandler publisher, PublishMessage pub, long publ } // Publish the event to the subscriber - receiver.controller.sendMessage(ev, IWampConnectionPromise.Empty); + receiver.controller.sendMessage(ev, IWampConnectionPromise.LogError); } } @@ -908,7 +914,7 @@ private void onMessageFromUnregisteredChannel(ClientHandler channelHandler, Wamp if (errorMsg != null) { AbortMessage abort = new AbortMessage(null, errorMsg); - channelHandler.controller.sendMessage(abort, IWampConnectionPromise.Empty); + channelHandler.controller.sendMessage(abort, IWampConnectionPromise.LogError); return; } @@ -928,7 +934,7 @@ private void onMessageFromUnregisteredChannel(ClientHandler channelHandler, Wamp if (roles.size() == 0 || hasUnsupportedRoles) { AbortMessage abort = new AbortMessage(null, ApplicationError.NO_SUCH_ROLE); - channelHandler.controller.sendMessage(abort, IWampConnectionPromise.Empty); + channelHandler.controller.sendMessage(abort, IWampConnectionPromise.LogError); return; } @@ -941,7 +947,7 @@ private void onMessageFromUnregisteredChannel(ClientHandler channelHandler, Wamp // Respond with the WELCOME message WelcomeMessage welcome = new WelcomeMessage(channelHandler.sessionId, realm.welcomeDetails); - channelHandler.controller.sendMessage(welcome, IWampConnectionPromise.Empty); + channelHandler.controller.sendMessage(welcome, IWampConnectionPromise.LogError); } private void closeActiveClient(ClientHandler channel, WampMessage closeMessage) { @@ -952,7 +958,7 @@ private void closeActiveClient(ClientHandler channel, WampMessage closeMessage) if (channel.controller != null) { if (closeMessage != null) - channel.controller.sendMessage(closeMessage, IWampConnectionPromise.Empty); + channel.controller.sendMessage(closeMessage, IWampConnectionPromise.LogError); closeConnection(channel.controller, true); } } diff --git a/jawampa-core/src/main/java/ws/wamp/jawampa/client/ConnectingState.java b/jawampa-core/src/main/java/ws/wamp/jawampa/client/ConnectingState.java index 2d779ac..6b7d95a 100644 --- a/jawampa-core/src/main/java/ws/wamp/jawampa/client/ConnectingState.java +++ b/jawampa-core/src/main/java/ws/wamp/jawampa/client/ConnectingState.java @@ -102,7 +102,7 @@ public void run() { } }); } catch (RejectedExecutionException e) { - connection.close(false, IWampConnectionPromise.Empty); + connection.close(false, IWampConnectionPromise.LogError); } } diff --git a/jawampa-core/src/main/java/ws/wamp/jawampa/client/HandshakingState.java b/jawampa-core/src/main/java/ws/wamp/jawampa/client/HandshakingState.java index a70a9f7..63733f6 100644 --- a/jawampa-core/src/main/java/ws/wamp/jawampa/client/HandshakingState.java +++ b/jawampa-core/src/main/java/ws/wamp/jawampa/client/HandshakingState.java @@ -74,7 +74,7 @@ void closeIncompleteSession(Throwable disconnectReason, String optAbortReason, b // Send abort to the remote if (optAbortReason != null) { AbortMessage msg = new AbortMessage(null, optAbortReason); - connectionController.sendMessage(msg, IWampConnectionPromise.Empty); + connectionController.sendMessage(msg, IWampConnectionPromise.LogError); } int nrReconnects = reconnectAllowed ? nrReconnectAttempts : 0; @@ -122,7 +122,7 @@ void sendHelloMessage() { // However the WAMP session is not established until the handshake was finished connectionController - .sendMessage(new WampMessages.HelloMessage(stateController.clientConfig().realm(), stateController.clientConfig().helloDetails()), IWampConnectionPromise.Empty); + .sendMessage(new WampMessages.HelloMessage(stateController.clientConfig().realm(), stateController.clientConfig().helloDetails()), IWampConnectionPromise.LogError); } void onMessage(WampMessage msg) { @@ -169,7 +169,7 @@ else if (msg instanceof ChallengeMessage) { if (reply == null) { handleProtocolError(); } else { - connectionController.sendMessage(reply, IWampConnectionPromise.Empty); + connectionController.sendMessage(reply, IWampConnectionPromise.LogError); } return; } diff --git a/jawampa-core/src/main/java/ws/wamp/jawampa/client/SessionEstablishedState.java b/jawampa-core/src/main/java/ws/wamp/jawampa/client/SessionEstablishedState.java index 1f30266..0b54968 100644 --- a/jawampa-core/src/main/java/ws/wamp/jawampa/client/SessionEstablishedState.java +++ b/jawampa-core/src/main/java/ws/wamp/jawampa/client/SessionEstablishedState.java @@ -178,7 +178,7 @@ void closeSession(Throwable disconnectReason, String optCloseMessageReason, bool // Send goodbye message with close reason to the remote if (optCloseMessageReason != null) { GoodbyeMessage msg = new GoodbyeMessage(null, optCloseMessageReason); - connectionController.sendMessage(msg, IWampConnectionPromise.Empty); + connectionController.sendMessage(msg, IWampConnectionPromise.LogError); } stateController.setExternalState(new WampClient.DisconnectedState(disconnectReason)); @@ -402,7 +402,7 @@ else if (msg instanceof InvocationMessage) { connectionController.sendMessage( new ErrorMessage(InvocationMessage.ID, m.requestId, null, ApplicationError.NO_SUCH_PROCEDURE, null, null), - IWampConnectionPromise.Empty); + IWampConnectionPromise.LogError); } else { // Send the request to the subscriber, which can then send responses @@ -442,7 +442,7 @@ public void performPublish(final String topic, final EnumSet flags final WampMessages.PublishMessage msg = new WampMessages.PublishMessage(requestId, options, topic, arguments, argumentsKw); - connectionController.sendMessage(msg, IWampConnectionPromise.Empty); + connectionController.sendMessage(msg, IWampConnectionPromise.LogError); } public void performCall(final String procedure, @@ -465,7 +465,7 @@ public void performCall(final String procedure, arguments, argumentsKw); requestMap.put(requestId, new RequestMapEntry(CallMessage.ID, resultSubject)); - connectionController.sendMessage(callMsg, IWampConnectionPromise.Empty); + connectionController.sendMessage(callMsg, IWampConnectionPromise.LogError); } public void performRegisterProcedure(final String topic, final Subscriber subscriber) { @@ -526,7 +526,7 @@ public void call(Throwable t1) { requestMap.put(requestId, new RequestMapEntry(RegisterMessage.ID, registerFuture)); - connectionController.sendMessage(msg, IWampConnectionPromise.Empty); + connectionController.sendMessage(msg, IWampConnectionPromise.LogError); } /** @@ -573,7 +573,7 @@ public void call(Throwable t1) { requestMap.put(requestId, new RequestMapEntry( UnregisterMessage.ID, unregisterFuture)); - connectionController.sendMessage(msg, IWampConnectionPromise.Empty); + connectionController.sendMessage(msg, IWampConnectionPromise.LogError); } }); } @@ -657,7 +657,7 @@ public void call(Throwable t1) { requestMap.put(requestId, new RequestMapEntry(SubscribeMessage.ID, subscribeFuture)); - connectionController.sendMessage(msg, IWampConnectionPromise.Empty); + connectionController.sendMessage(msg, IWampConnectionPromise.LogError); } } @@ -709,7 +709,7 @@ public void call(Throwable t1) { requestMap.put(requestId, new RequestMapEntry( UnsubscribeMessage.ID, unsubscribeFuture)); - connectionController.sendMessage(msg, IWampConnectionPromise.Empty); + connectionController.sendMessage(msg, IWampConnectionPromise.LogError); } } }); diff --git a/jawampa-core/src/main/java/ws/wamp/jawampa/connection/IWampConnectionPromise.java b/jawampa-core/src/main/java/ws/wamp/jawampa/connection/IWampConnectionPromise.java index e53da66..1585b76 100644 --- a/jawampa-core/src/main/java/ws/wamp/jawampa/connection/IWampConnectionPromise.java +++ b/jawampa-core/src/main/java/ws/wamp/jawampa/connection/IWampConnectionPromise.java @@ -1,7 +1,9 @@ package ws.wamp.jawampa.connection; -public interface IWampConnectionPromise extends IWampConnectionFuture { +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +public interface IWampConnectionPromise extends IWampConnectionFuture { void fulfill(T value); void reject(Throwable error); @@ -14,7 +16,9 @@ public interface IWampConnectionPromise extends IWampConnectionFuture { * A default implementation of the promise whose instance methods do nothing.
* Can be used in cases where the caller is not interested in the call results. */ - public static final IWampConnectionPromise Empty = new IWampConnectionPromise() { + public static final IWampConnectionPromise LogError = new IWampConnectionPromise() { + private final Logger logger = LoggerFactory.getLogger(IWampConnectionPromise.class); + @Override public Void result() { return null; @@ -32,7 +36,7 @@ public void fulfill(Void value) { @Override public void reject(Throwable error) { - + logger.error("promise rejected", error); } @Override diff --git a/jawampa-netty/src/main/java/ws/wamp/jawampa/transport/netty/WampSerializationHandler.java b/jawampa-netty/src/main/java/ws/wamp/jawampa/transport/netty/WampSerializationHandler.java index 28863c3..771ee44 100644 --- a/jawampa-netty/src/main/java/ws/wamp/jawampa/transport/netty/WampSerializationHandler.java +++ b/jawampa-netty/src/main/java/ws/wamp/jawampa/transport/netty/WampSerializationHandler.java @@ -66,6 +66,7 @@ protected void encode(ChannelHandlerContext ctx, WampMessage msg, List o } } catch (Exception e) { + logger.error("error in serializing wampMessage", e); msgBuffer.release(); return; }