Skip to content
This repository has been archived by the owner on May 22, 2019. It is now read-only.

log errors encountered #61

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions jawampa-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,16 @@
<artifactId>jackson-dataformat-msgpack</artifactId>
<version>0.7.0-p7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>1.7.12</version>
<optional>true</optional>
</dependency>
</dependencies>
</project>
4 changes: 2 additions & 2 deletions jawampa-core/src/main/java/ws/wamp/jawampa/Request.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void replyError(String errorUri, ArrayNode arguments, ObjectNode keywordA
@Override
public void run() {
if (stateController.currentState() != session) return;
session.connectionController().sendMessage(msg, IWampConnectionPromise.Empty);
session.connectionController().sendMessage(msg, IWampConnectionPromise.LogError);
}
});
}
Expand All @@ -149,7 +149,7 @@ public void reply(ArrayNode arguments, ObjectNode keywordArguments) {
@Override
public void run() {
if (stateController.currentState() != session) return;
session.connectionController().sendMessage(msg, IWampConnectionPromise.Empty);
session.connectionController().sendMessage(msg, IWampConnectionPromise.LogError);
}
});
}
Expand Down
56 changes: 31 additions & 25 deletions jawampa-core/src/main/java/ws/wamp/jawampa/WampRouter.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;
Expand Down Expand Up @@ -59,6 +61,7 @@
* protocol.<br>
*/
public class WampRouter {
private final static Logger logger = LoggerFactory.getLogger(WampRouter.class);

final static Set<WampRoles> SUPPORTED_CLIENT_ROLES;
static {
Expand Down Expand Up @@ -138,7 +141,7 @@ void removeChannel(ClientHandler channel, boolean removeFromList) {
if (invoc.caller.state != RouterHandlerState.Open) continue;
ErrorMessage errMsg = new ErrorMessage(CallMessage.ID, invoc.callRequestId,
null, ApplicationError.NO_SUCH_PROCEDURE, null, null);
invoc.caller.controller.sendMessage(errMsg, IWampConnectionPromise.Empty);
invoc.caller.controller.sendMessage(errMsg, IWampConnectionPromise.LogError);
}
proc.pendingCalls.clear();
// Remove the procedure from the realm
Expand Down Expand Up @@ -316,7 +319,7 @@ public void run() {
ri.removeChannel(channel, false);
channel.markAsClosed();
GoodbyeMessage goodbye = new GoodbyeMessage(null, ApplicationError.SYSTEM_SHUTDOWN);
channel.controller.sendMessage(goodbye, IWampConnectionPromise.Empty);
channel.controller.sendMessage(goodbye, IWampConnectionPromise.LogError);
closeConnection(channel.controller, true);
}
ri.channelsBySessionId.clear();
Expand Down Expand Up @@ -356,7 +359,7 @@ public void run() {
|| newConnection == null) {
// This is always true if the transport provider does not manipulate the structure
// that was sent by the router
if (newConnection != null) newConnection.close(false, IWampConnectionPromise.Empty);
if (newConnection != null) newConnection.close(false, IWampConnectionPromise.LogError);
return;
}
QueueingConnectionController controller = (QueueingConnectionController)connectionListener;
Expand All @@ -379,7 +382,7 @@ public void run() {
Runnable r = new Runnable () {
@Override
public void run() {
newConnection.close(false, IWampConnectionPromise.Empty);
newConnection.close(false, IWampConnectionPromise.LogError);
}
};
ExecutorService executor = Executors.newSingleThreadExecutor();
Expand Down Expand Up @@ -434,6 +437,9 @@ public void transportClosed() {
@Override
public void transportError(Throwable cause) {
if (isDisposed || state != RouterHandlerState.Open) return;
if (cause!=null) {
logger.error("closing client because of transportError", cause);
}
if (realm != null) {
closeActiveClient(ClientHandler.this, null);
} else {
Expand Down Expand Up @@ -469,7 +475,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m
// Echo the message in case of goodbye
if (msg instanceof GoodbyeMessage) {
GoodbyeMessage reply = new GoodbyeMessage(null, ApplicationError.GOODBYE_AND_OUT);
handler.controller.sendMessage(reply, IWampConnectionPromise.Empty);
handler.controller.sendMessage(reply, IWampConnectionPromise.LogError);
}
} else if (msg instanceof CallMessage) {
// The client wants to call a remote function
Expand All @@ -495,7 +501,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m
if (err != null) { // If we have an error send that to the client
ErrorMessage errMsg = new ErrorMessage(CallMessage.ID, call.requestId,
null, err, null, null);
handler.controller.sendMessage(errMsg, IWampConnectionPromise.Empty);
handler.controller.sendMessage(errMsg, IWampConnectionPromise.LogError);
return;
}

Expand All @@ -516,7 +522,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m
// And send it to the provider
InvocationMessage imsg = new InvocationMessage(invoc.invocationRequestId,
proc.registrationId, null, call.arguments, call.argumentsKw);
proc.provider.controller.sendMessage(imsg, IWampConnectionPromise.Empty);
proc.provider.controller.sendMessage(imsg, IWampConnectionPromise.LogError);
} else if (msg instanceof YieldMessage) {
// The clients sends as the result of an RPC
// Verify the message
Expand All @@ -530,7 +536,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m
invoc.procedure.pendingCalls.remove(invoc);
// Send the result to the original caller
ResultMessage result = new ResultMessage(invoc.callRequestId, null, yield.arguments, yield.argumentsKw);
invoc.caller.controller.sendMessage(result, IWampConnectionPromise.Empty);
invoc.caller.controller.sendMessage(result, IWampConnectionPromise.LogError);
} else if (msg instanceof ErrorMessage) {
ErrorMessage err = (ErrorMessage) msg;
if (!(IdValidator.isValidId(err.requestId))) {
Expand All @@ -555,7 +561,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m
// Send the result to the original caller
ErrorMessage fwdError = new ErrorMessage(CallMessage.ID, invoc.callRequestId,
null, err.error, err.arguments, err.argumentsKw);
invoc.caller.controller.sendMessage(fwdError, IWampConnectionPromise.Empty);
invoc.caller.controller.sendMessage(fwdError, IWampConnectionPromise.LogError);
}
// else TODO: Are there any other possibilities where a client could return ERROR
} else if (msg instanceof RegisterMessage) {
Expand All @@ -582,7 +588,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m
if (err != null) { // If we have an error send that to the client
ErrorMessage errMsg = new ErrorMessage(RegisterMessage.ID, reg.requestId,
null, err, null, null);
handler.controller.sendMessage(errMsg, IWampConnectionPromise.Empty);
handler.controller.sendMessage(errMsg, IWampConnectionPromise.LogError);
return;
}

Expand All @@ -600,7 +606,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m
handler.providedProcedures.put(procInfo.registrationId, procInfo);

RegisteredMessage response = new RegisteredMessage(reg.requestId, procInfo.registrationId);
handler.controller.sendMessage(response, IWampConnectionPromise.Empty);
handler.controller.sendMessage(response, IWampConnectionPromise.LogError);
} else if (msg instanceof UnregisterMessage) {
// The client wants to unregister a procedure
// Verify the message
Expand Down Expand Up @@ -628,7 +634,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m
if (err != null) { // If we have an error send that to the client
ErrorMessage errMsg = new ErrorMessage(UnregisterMessage.ID, unreg.requestId,
null, err, null, null);
handler.controller.sendMessage(errMsg, IWampConnectionPromise.Empty);
handler.controller.sendMessage(errMsg, IWampConnectionPromise.LogError);
return;
}

Expand All @@ -638,7 +644,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m
if (invoc.caller.state == RouterHandlerState.Open) {
ErrorMessage errMsg = new ErrorMessage(CallMessage.ID, invoc.callRequestId,
null, ApplicationError.NO_SUCH_PROCEDURE, null, null);
invoc.caller.controller.sendMessage(errMsg, IWampConnectionPromise.Empty);
invoc.caller.controller.sendMessage(errMsg, IWampConnectionPromise.LogError);
}
}
proc.pendingCalls.clear();
Expand All @@ -654,7 +660,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m

// Send the acknowledge
UnregisteredMessage response = new UnregisteredMessage(unreg.requestId);
handler.controller.sendMessage(response, IWampConnectionPromise.Empty);
handler.controller.sendMessage(response, IWampConnectionPromise.LogError);
} else if (msg instanceof SubscribeMessage) {
// The client wants to subscribe to a procedure
// Verify the message
Expand Down Expand Up @@ -700,7 +706,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m
if (err != null) { // If we have an error send that to the client
ErrorMessage errMsg = new ErrorMessage(SubscribeMessage.ID, sub.requestId,
null, err, null, null);
handler.controller.sendMessage(errMsg, IWampConnectionPromise.Empty);
handler.controller.sendMessage(errMsg, IWampConnectionPromise.LogError);
return;
}

Expand Down Expand Up @@ -737,7 +743,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m
}

SubscribedMessage response = new SubscribedMessage(sub.requestId, subscription.subscriptionId);
handler.controller.sendMessage(response, IWampConnectionPromise.Empty);
handler.controller.sendMessage(response, IWampConnectionPromise.LogError);
} else if (msg instanceof UnsubscribeMessage) {
// The client wants to cancel a subscription
// Verify the message
Expand All @@ -763,7 +769,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m
if (err != null) { // If we have an error send that to the client
ErrorMessage errMsg = new ErrorMessage(UnsubscribeMessage.ID, unsub.requestId,
null, err, null, null);
handler.controller.sendMessage(errMsg, IWampConnectionPromise.Empty);
handler.controller.sendMessage(errMsg, IWampConnectionPromise.LogError);
return;
}

Expand All @@ -784,7 +790,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m

// Send the acknowledge
UnsubscribedMessage response = new UnsubscribedMessage(unsub.requestId);
handler.controller.sendMessage(response, IWampConnectionPromise.Empty);
handler.controller.sendMessage(response, IWampConnectionPromise.LogError);
} else if (msg instanceof PublishMessage) {
// The client wants to publish something to all subscribers (apart from himself)
PublishMessage pub = (PublishMessage) msg;
Expand All @@ -810,7 +816,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m
ErrorMessage errMsg = new ErrorMessage(PublishMessage.ID, pub.requestId,
null, err, null, null);
if (sendAcknowledge) {
handler.controller.sendMessage(errMsg, IWampConnectionPromise.Empty);
handler.controller.sendMessage(errMsg, IWampConnectionPromise.LogError);
}
return;
}
Expand Down Expand Up @@ -852,7 +858,7 @@ private void onMessageFromRegisteredChannel(ClientHandler handler, WampMessage m

if (sendAcknowledge) {
PublishedMessage response = new PublishedMessage(pub.requestId, publicationId);
handler.controller.sendMessage(response, IWampConnectionPromise.Empty);
handler.controller.sendMessage(response, IWampConnectionPromise.LogError);
}
}
}
Expand Down Expand Up @@ -880,7 +886,7 @@ private void publishEvent(ClientHandler publisher, PublishMessage pub, long publ
}

// Publish the event to the subscriber
receiver.controller.sendMessage(ev, IWampConnectionPromise.Empty);
receiver.controller.sendMessage(ev, IWampConnectionPromise.LogError);
}
}

Expand Down Expand Up @@ -908,7 +914,7 @@ private void onMessageFromUnregisteredChannel(ClientHandler channelHandler, Wamp

if (errorMsg != null) {
AbortMessage abort = new AbortMessage(null, errorMsg);
channelHandler.controller.sendMessage(abort, IWampConnectionPromise.Empty);
channelHandler.controller.sendMessage(abort, IWampConnectionPromise.LogError);
return;
}

Expand All @@ -928,7 +934,7 @@ private void onMessageFromUnregisteredChannel(ClientHandler channelHandler, Wamp

if (roles.size() == 0 || hasUnsupportedRoles) {
AbortMessage abort = new AbortMessage(null, ApplicationError.NO_SUCH_ROLE);
channelHandler.controller.sendMessage(abort, IWampConnectionPromise.Empty);
channelHandler.controller.sendMessage(abort, IWampConnectionPromise.LogError);
return;
}

Expand All @@ -941,7 +947,7 @@ private void onMessageFromUnregisteredChannel(ClientHandler channelHandler, Wamp

// Respond with the WELCOME message
WelcomeMessage welcome = new WelcomeMessage(channelHandler.sessionId, realm.welcomeDetails);
channelHandler.controller.sendMessage(welcome, IWampConnectionPromise.Empty);
channelHandler.controller.sendMessage(welcome, IWampConnectionPromise.LogError);
}

private void closeActiveClient(ClientHandler channel, WampMessage closeMessage) {
Expand All @@ -952,7 +958,7 @@ private void closeActiveClient(ClientHandler channel, WampMessage closeMessage)

if (channel.controller != null) {
if (closeMessage != null)
channel.controller.sendMessage(closeMessage, IWampConnectionPromise.Empty);
channel.controller.sendMessage(closeMessage, IWampConnectionPromise.LogError);
closeConnection(channel.controller, true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void run() {
}
});
} catch (RejectedExecutionException e) {
connection.close(false, IWampConnectionPromise.Empty);
connection.close(false, IWampConnectionPromise.LogError);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ void closeIncompleteSession(Throwable disconnectReason, String optAbortReason, b
// Send abort to the remote
if (optAbortReason != null) {
AbortMessage msg = new AbortMessage(null, optAbortReason);
connectionController.sendMessage(msg, IWampConnectionPromise.Empty);
connectionController.sendMessage(msg, IWampConnectionPromise.LogError);
}

int nrReconnects = reconnectAllowed ? nrReconnectAttempts : 0;
Expand Down Expand Up @@ -122,7 +122,7 @@ void sendHelloMessage() {
// However the WAMP session is not established until the handshake was finished

connectionController
.sendMessage(new WampMessages.HelloMessage(stateController.clientConfig().realm(), stateController.clientConfig().helloDetails()), IWampConnectionPromise.Empty);
.sendMessage(new WampMessages.HelloMessage(stateController.clientConfig().realm(), stateController.clientConfig().helloDetails()), IWampConnectionPromise.LogError);
}

void onMessage(WampMessage msg) {
Expand Down Expand Up @@ -169,7 +169,7 @@ else if (msg instanceof ChallengeMessage) {
if (reply == null) {
handleProtocolError();
} else {
connectionController.sendMessage(reply, IWampConnectionPromise.Empty);
connectionController.sendMessage(reply, IWampConnectionPromise.LogError);
}
return;
}
Expand Down
Loading