Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WICKET-6954] implement server side heart-beat + client reconnection in case of inactivity #499

Open
wants to merge 4 commits into
base: wicket-9.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.wicket.protocol.https.HttpsConfig;
import org.apache.wicket.protocol.https.HttpsMapper;
import org.apache.wicket.protocol.ws.WebSocketSettings;
import org.apache.wicket.protocol.ws.api.IWebSocketSession;
import org.apache.wicket.protocol.ws.api.IWebSocketSessionConfigurer;
import org.apache.wicket.protocol.ws.timer.HeartBeatWithReconnectTimer;
import org.apache.wicket.protocol.ws.timer.PingPongHeartBeatTimer;
import org.apache.wicket.request.Request;
import org.apache.wicket.request.Response;
import org.slf4j.Logger;
Expand All @@ -43,6 +43,7 @@ public class JSR356Application extends WicketExampleApplication
private static final Logger LOGGER = LoggerFactory.getLogger(JSR356Application.class);

private ScheduledExecutorService scheduledExecutorService;
private HeartBeatWithReconnectTimer heartBeatWithReconnectTimer;

@Override
public Class<HomePage> getHomePage()
Expand Down Expand Up @@ -83,6 +84,10 @@ public void init()
webSocketSettings.setSecurePort(8443);
}

webSocketSettings.setUseHeartBeat(true);
webSocketSettings.setReconnectOnFailure(true);
heartBeatWithReconnectTimer = new HeartBeatWithReconnectTimer(webSocketSettings);
heartBeatWithReconnectTimer.start(this);
// The websocket example loads JS from ajax.googleapis.com, which is not allowed by the CSP.
// This now serves as an example on how to disable CSP
getCspSettings().blocking().disabled();
Expand All @@ -97,7 +102,7 @@ public Session newSession(Request request, Response response)
@Override
protected void onDestroy() {
scheduledExecutorService.shutdownNow();

heartBeatWithReconnectTimer.stop();
super.onDestroy();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,14 @@ protected void onDownloadFailed(AjaxRequestTarget target)
private static final long serialVersionUID = 1L;

@Override
protected void onConnect(ConnectedMessage message)
{
protected void onConnect(ConnectedMessage message) {
super.onConnect(message);

ScheduledExecutorService service = JSR356Application.get().getScheduledExecutorService();
ChartUpdater.start(message, service);
if (!message.isReconnected())
{
ScheduledExecutorService service = JSR356Application.get().getScheduledExecutorService();
ChartUpdater.start(message, service);
}
}
});
add(downloadingContainer.setOutputMarkupPlaceholderTag(true).setVisible(false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.http.HttpServletRequest;
import java.time.Duration;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import javax.servlet.http.HttpServletRequest;

/**
* Web Socket related settings.
*
Expand Down Expand Up @@ -88,6 +90,7 @@ public class WebSocketSettings
private final AtomicInteger port = new AtomicInteger();
private final AtomicInteger securePort = new AtomicInteger();


/**
* Holds this WebSocketSettings in the Application's metadata.
* This way wicket-core module doesn't have reference to wicket-native-websocket.
Expand Down Expand Up @@ -139,6 +142,53 @@ public static void set(Application application, WebSocketSettings settings)
*/
private IWebSocketConnectionFilter connectionFilter;

/**
* Boolean used to determine if server side custom heart beat will be used.
* Use this in combination with {@code reconnectOnFailure = true) in order
* to
*/
private boolean useHeartBeat = false;


/**
* Boolean used to determine if ping-pong heart beat will be used. Mind that
* standard ping-pong does not specify a client side "onping" and
* pong message might be generated by browser itself. Thus, this can't be
*/
private boolean usePingPongHeartBeat = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference with useHeartBeat ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems ping-pong is designed for the following use case:

-server sends pings to all remote client peers and clients respond with a pong
-server can use this to determine some connection is dead and kill it
-it seems the standard does not include a public onping at client side. Thus browser might respond to this with a pong. Chrome responds with a pong but something like

ws.onping = function() {}
is not called.

Some implementations defines "onping" at client side. See

https://github.com/websockets/ws#how-to-detect-and-close-broken-connections

I have include a class that starts a ping-pong scheduler. But this can't be used a client side in order to reconnect. This uses usePingPongHeartBeat flag.

I have included another timer that send special byte array... This one uses useHeartBeat and is the one used to reconnect broken web-scoket

Not sure if we should keep ping-pong timer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/**
* Flag used to determine if client will try to reconnect in case of ping-pong failure
*/
private boolean reconnectOnFailure = false;

/**
* In case ping or remote client immediately fails, this determines how many times ping
* will be retried before connection it terminated.
*/
private int maxPingRetries = 3;

/**
* Periodicity by which the heartbeat timer will kick.
*/
private long heartBeatPace = Duration.ofSeconds(15).toMillis();

/**
* The max threshold assumed for network latency.
*/
private long networkLatencyThreshold = Duration.ofSeconds(2).toMillis();

/**
* The executor that handles delivering pings to remote peers and checking if peers have
* correctly ponged back (and terminates connections in case not).
*/
private Executor heartBeatsExecutor = new HeartBeatsExecutor();

/**
* Whether messages are broadcast when receiving pong messages
*/
private boolean sendMessagesOnPong = false;

/**
* A {@link org.apache.wicket.protocol.ws.api.IWebSocketSessionConfigurer} that allows to configure
* {@link org.apache.wicket.protocol.ws.api.IWebSocketSession}s.
Expand Down Expand Up @@ -424,6 +474,96 @@ public Integer getSecurePort()
return securePort.get();
}

public void setUseHeartBeat(boolean useHeartBeat)
{
this.useHeartBeat = useHeartBeat;
}

public boolean isUseHeartBeat()
{
return useHeartBeat;
}

public void setReconnectOnFailure(boolean reconnectOnFailure)
{
this.reconnectOnFailure = reconnectOnFailure;
}

public boolean isReconnectOnFailure()
{
return reconnectOnFailure;
}

public long getHeartBeatPace()
{
return heartBeatPace;
}

public void setHeartBeatPace(long heartBeatPace)
{
this.heartBeatPace = heartBeatPace;
}

public void setHeartBeatPace(Duration heartBeatPace)
{
this.heartBeatPace = heartBeatPace.toMillis();
}

public long getNetworkLatencyThreshold()
{
return networkLatencyThreshold;
}

public void setNetworkLatencyThreshold(long networkLatencyThreshold)
{
this.networkLatencyThreshold = networkLatencyThreshold;
}

public void setNetworkLatencyThreshold(Duration networkLatencyThreshold)
{
this.networkLatencyThreshold = networkLatencyThreshold.toMillis();
}

public void setHeartBeatsExecutor(Executor heartBeatsExecutor)
{
this.heartBeatsExecutor = heartBeatsExecutor;
}

public void setMaxPingRetries(int maxPingRetries)
{
this.maxPingRetries = maxPingRetries;
}

public void setSendMessagesOnPong(boolean sendMessagesOnPong)
{
this.sendMessagesOnPong = sendMessagesOnPong;
}

public boolean isSendMessagesOnPong()
{
return sendMessagesOnPong;
}

public Executor getHeartBeatsExecutor()
{
return heartBeatsExecutor;
}

public int getMaxPingRetries()
{
return maxPingRetries;
}

public void setUsePingPongHeartBeat(boolean usePingPongHeartBeat)
{
this.usePingPongHeartBeat = usePingPongHeartBeat;
}

public boolean isUsePingPongHeartBeat()
{
return usePingPongHeartBeat;
}

/**
* Simple executor that runs the tasks in the caller thread.
*/
Expand Down Expand Up @@ -487,6 +627,33 @@ public void run(final Runnable command)
}
}

public static class HeartBeatsExecutor implements Executor
{

private final java.util.concurrent.Executor executor;


public HeartBeatsExecutor()
{
this(new ThreadPoolExecutor(1, 8,
60L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new ThreadFactory()));
}

public HeartBeatsExecutor(java.util.concurrent.Executor executor)
{
this.executor = executor;

}

@Override
public void run(final Runnable command)
{
executor.execute(command);
}
}

public static class ThreadFactory implements java.util.concurrent.ThreadFactory
{
private final AtomicInteger counter = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.wicket.protocol.ws.api;

import java.nio.ByteBuffer;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;

Expand Down Expand Up @@ -45,6 +47,7 @@
import org.apache.wicket.protocol.ws.api.message.ErrorMessage;
import org.apache.wicket.protocol.ws.api.message.IWebSocketMessage;
import org.apache.wicket.protocol.ws.api.message.IWebSocketPushMessage;
import org.apache.wicket.protocol.ws.api.message.PongMessage;
import org.apache.wicket.protocol.ws.api.message.TextMessage;
import org.apache.wicket.protocol.ws.api.registry.IKey;
import org.apache.wicket.protocol.ws.api.registry.IWebSocketConnectionRegistry;
Expand Down Expand Up @@ -81,7 +84,6 @@ public abstract class AbstractWebSocketProcessor implements IWebSocketProcessor
* A pageId indicating that the endpoint is WebSocketResource
*/
static final int NO_PAGE_ID = -1;
static final String NO_PAGE_CLASS = "_NO_PAGE";

private final WebRequest webRequest;
private final int pageId;
Expand Down Expand Up @@ -148,6 +150,24 @@ public AbstractWebSocketProcessor(final HttpServletRequest request, final WebApp
this.connectionFilter = webSocketSettings.getConnectionFilter();
}

@Override
public void onPong(ByteBuffer byteBuffer)
{
IKey key = getRegistryKey();
WebApplication application = getApplication();
String sessionId = getSessionId();
IWebSocketConnection webSocketConnection = connectionRegistry.getConnection(application, sessionId, key);
if (webSocketConnection != null)
{
webSocketConnection.onPong(byteBuffer);
if (webSocketSettings.isSendMessagesOnPong())
{
// if we want to deliver messages on pong deliver them
broadcastMessage(new PongMessage(application, sessionId, key, byteBuffer));
}
}
}

@Override
public void onMessage(final String message)
{
Expand All @@ -168,7 +188,7 @@ public void onMessage(byte[] data, int offset, int length)
* the web socket connection to use to communicate with the client
* @see #onOpen(Object)
*/
protected final void onConnect(final IWebSocketConnection connection) {
protected final void onConnect(final IWebSocketConnection connection, boolean reconnected) {
IKey key = getRegistryKey();
connectionRegistry.setConnection(getApplication(), getSessionId(), key, connection);

Expand All @@ -184,7 +204,7 @@ protected final void onConnect(final IWebSocketConnection connection) {
}
}

broadcastMessage(new ConnectedMessage(getApplication(), getSessionId(), key), connection);
broadcastMessage(new ConnectedMessage(getApplication(), getSessionId(), key, reconnected), connection);
}

@Override
Expand All @@ -202,7 +222,9 @@ public void onError(Throwable t)
{
if (webSocketSettings.shouldNotifyOnErrorEvent(t)) {
IKey key = getRegistryKey();
broadcastMessage(new ErrorMessage(getApplication(), getSessionId(), key, t));
IWebSocketConnection connection = connectionRegistry.getConnection(application, sessionId, key);
ErrorMessage message = new ErrorMessage(application, sessionId, key, t);
broadcastMessage(message, connection);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ private Map<String, Object> getParameters(Component component) {
Integer securePort = getSecurePort(webSocketSettings);
variables.put("securePort", securePort);

variables.put("useHeartBeat", isUseHeartBeat(webSocketSettings));

variables.put("reconnectOnFailure", isReconnectOnFailure(webSocketSettings));

variables.put("heartBeatPace", getHeartBeatPace(webSocketSettings));

variables.put("networkLatencyThreshold", getNetworkLatencyThreshold(webSocketSettings));

CharSequence contextPath = getContextPath(webSocketSettings);
Args.notNull(contextPath, "contextPath");
variables.put("contextPath", contextPath);
Expand All @@ -177,6 +185,26 @@ private Map<String, Object> getParameters(Component component) {
return variables;
}

protected boolean isUseHeartBeat(WebSocketSettings webSocketSettings)
{
return webSocketSettings.isUseHeartBeat();
}

protected boolean isReconnectOnFailure(WebSocketSettings webSocketSettings)
{
return webSocketSettings.isReconnectOnFailure();
}

protected long getHeartBeatPace(WebSocketSettings webSocketSettings)
{
return webSocketSettings.getHeartBeatPace();
}

protected long getNetworkLatencyThreshold(WebSocketSettings webSocketSettings)
{
return webSocketSettings.getNetworkLatencyThreshold();
}

protected Integer getPort(WebSocketSettings webSocketSettings)
{
return webSocketSettings.getPort();
Expand Down
Loading