From daa5dc90533657e0c576f2cb8cdcbfb75d9daecb Mon Sep 17 00:00:00 2001 From: Dimitris Papavasiliou Date: Sun, 23 Sep 2018 16:25:38 +0300 Subject: [PATCH] Detect and recycle dangling websockets. Co-authored-by: Dimitris Papavasiliou Co-authored-by: Ralf Kohrt --- .../api/util/RealtimeSleepTimer.java | 13 +--- .../api/SignalServiceMessagePipe.java | 4 +- .../websocket/WebSocketConnection.java | 77 +++++++++++-------- 3 files changed, 51 insertions(+), 43 deletions(-) diff --git a/android/src/main/java/org/whispersystems/signalservice/api/util/RealtimeSleepTimer.java b/android/src/main/java/org/whispersystems/signalservice/api/util/RealtimeSleepTimer.java index 55c2eb7d85..6b93aaa86e 100644 --- a/android/src/main/java/org/whispersystems/signalservice/api/util/RealtimeSleepTimer.java +++ b/android/src/main/java/org/whispersystems/signalservice/api/util/RealtimeSleepTimer.java @@ -31,21 +31,14 @@ public RealtimeSleepTimer(Context context) { } @Override - public void sleep(long millis) { + public void sleep(long millis) throws InterruptedException { context.registerReceiver(alarmReceiver, new IntentFilter(AlarmReceiver.WAKE_UP_THREAD_ACTION)); - final long startTime = System.currentTimeMillis(); alarmReceiver.setAlarm(millis); - while (System.currentTimeMillis() - startTime < millis) { - try { - synchronized (this) { - wait(millis - System.currentTimeMillis() + startTime); - } - } catch (InterruptedException e) { - Log.w(TAG, e); - } + synchronized (this) { + wait(millis); } context.unregisterReceiver(alarmReceiver); diff --git a/java/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java b/java/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java index 92daad698e..143ca24cfc 100644 --- a/java/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java +++ b/java/src/main/java/org/whispersystems/signalservice/api/SignalServiceMessagePipe.java @@ -68,7 +68,7 @@ public class SignalServiceMessagePipe { * @throws TimeoutException */ public SignalServiceEnvelope read(long timeout, TimeUnit unit) - throws InvalidVersionException, IOException, TimeoutException + throws InvalidVersionException, IOException, TimeoutException, InterruptedException { return read(timeout, unit, new NullMessagePipeCallback()); } @@ -91,7 +91,7 @@ public SignalServiceEnvelope read(long timeout, TimeUnit unit) * @throws InvalidVersionException */ public SignalServiceEnvelope read(long timeout, TimeUnit unit, MessagePipeCallback callback) - throws TimeoutException, IOException, InvalidVersionException + throws TimeoutException, IOException, InvalidVersionException, InterruptedException { if (!credentialsProvider.isPresent()) { throw new IllegalArgumentException("You can't read messages if you haven't specified credentials"); diff --git a/java/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java b/java/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java index 7075cf4305..69880ae18d 100644 --- a/java/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java +++ b/java/src/main/java/org/whispersystems/signalservice/internal/websocket/WebSocketConnection.java @@ -20,6 +20,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -127,22 +128,20 @@ public synchronized void disconnect() { } if (keepAliveSender != null) { - keepAliveSender.shutdown(); + keepAliveSender.interrupt(); keepAliveSender = null; } } public synchronized WebSocketRequestMessage readRequest(long timeoutMillis) - throws TimeoutException, IOException + throws TimeoutException, IOException, InterruptedException { if (client == null) { throw new IOException("Connection closed!"); } - long startTime = System.currentTimeMillis(); - - while (client != null && incomingRequests.isEmpty() && elapsedTime(startTime) < timeoutMillis) { - Util.wait(this, Math.max(1, timeoutMillis - elapsedTime(startTime))); + if (client != null && incomingRequests.isEmpty()) { + wait(timeoutMillis); } if (incomingRequests.isEmpty() && client == null) throw new IOException("Connection closed!"); @@ -183,20 +182,16 @@ public synchronized void sendResponse(WebSocketResponseMessage response) throws } } - private synchronized void sendKeepAlive() throws IOException { + private synchronized Future> sendKeepAlive() throws IOException { if (keepAliveSender != null && client != null) { - byte[] message = WebSocketMessage.newBuilder() - .setType(WebSocketMessage.Type.REQUEST) - .setRequest(WebSocketRequestMessage.newBuilder() - .setId(System.currentTimeMillis()) - .setPath("/v1/keepalive") - .setVerb("GET") - .build()).build() - .toByteArray(); - - if (!client.send(ByteString.of(message))) { - throw new IOException("Write failed!"); - } + WebSocketRequestMessage request = WebSocketRequestMessage.newBuilder() + .setId(System.currentTimeMillis()) + .setPath("/v1/keepalive") + .setVerb("GET") + .build(); + return sendRequest(request); + } else { + return null; } } @@ -249,7 +244,7 @@ public synchronized void onClosed(WebSocket webSocket, int code, String reason) } if (keepAliveSender != null) { - keepAliveSender.shutdown(); + keepAliveSender.interrupt(); keepAliveSender = null; } @@ -312,23 +307,43 @@ private Pair createTlsSocketFactory(TrustSto private class KeepAliveSender extends Thread { - private AtomicBoolean stop = new AtomicBoolean(false); - public void run() { - while (!stop.get()) { + Future future = null; + boolean severed = false; + + while (!interrupted()) { try { sleepTimer.sleep(TimeUnit.SECONDS.toMillis(KEEPALIVE_TIMEOUT_SECONDS)); - Log.w(TAG, "Sending keep alive..."); - sendKeepAlive(); - } catch (Throwable e) { - Log.w(TAG, e); + if (future != null) { + try { + future.get(0L, TimeUnit.SECONDS); + } catch (ExecutionException | TimeoutException e){ + severed = true; + } + } + } catch (InterruptedException e) { + Log.d(TAG, "Keep alive sender interrupted; exiting loop."); + break; } - } - } - public void shutdown() { - stop.set(true); + if (severed) { + Log.d(TAG, "No response to previous keep-alive; forcing new connection."); + + disconnect(); + synchronized(WebSocketConnection.this) { + WebSocketConnection.this.notifyAll(); + } + } else { + Log.d(TAG, "Sending keep alive..."); + + try { + future = sendKeepAlive(); + } catch (IOException e) { + Log.d(TAG, "Failed to send keep alive: " + e.getMessage()); + } + } + } } }