Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Detect and recycle dangling websockets.
Browse files Browse the repository at this point in the history
Co-authored-by: Dimitris Papavasiliou <[email protected]>
Co-authored-by: Ralf Kohrt <[email protected]>
  • Loading branch information
Dimitris Papavasiliou and Ralf Kohrt committed May 4, 2019
1 parent da3a1e2 commit daa5dc9
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,14 @@ public RealtimeSleepTimer(Context context) {
}

@Override
public void sleep(long millis) {
public void sleep(long millis) throws InterruptedException {
context.registerReceiver(alarmReceiver,
new IntentFilter(AlarmReceiver.WAKE_UP_THREAD_ACTION));

final long startTime = System.currentTimeMillis();
alarmReceiver.setAlarm(millis);

while (System.currentTimeMillis() - startTime < millis) {
try {
synchronized (this) {
wait(millis - System.currentTimeMillis() + startTime);
}
} catch (InterruptedException e) {
Log.w(TAG, e);
}
synchronized (this) {
wait(millis);
}

context.unregisterReceiver(alarmReceiver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class SignalServiceMessagePipe {
* @throws TimeoutException
*/
public SignalServiceEnvelope read(long timeout, TimeUnit unit)
throws InvalidVersionException, IOException, TimeoutException
throws InvalidVersionException, IOException, TimeoutException, InterruptedException
{
return read(timeout, unit, new NullMessagePipeCallback());
}
Expand All @@ -91,7 +91,7 @@ public SignalServiceEnvelope read(long timeout, TimeUnit unit)
* @throws InvalidVersionException
*/
public SignalServiceEnvelope read(long timeout, TimeUnit unit, MessagePipeCallback callback)
throws TimeoutException, IOException, InvalidVersionException
throws TimeoutException, IOException, InvalidVersionException, InterruptedException
{
if (!credentialsProvider.isPresent()) {
throw new IllegalArgumentException("You can't read messages if you haven't specified credentials");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -127,22 +128,20 @@ public synchronized void disconnect() {
}

if (keepAliveSender != null) {
keepAliveSender.shutdown();
keepAliveSender.interrupt();
keepAliveSender = null;
}
}

public synchronized WebSocketRequestMessage readRequest(long timeoutMillis)
throws TimeoutException, IOException
throws TimeoutException, IOException, InterruptedException
{
if (client == null) {
throw new IOException("Connection closed!");
}

long startTime = System.currentTimeMillis();

while (client != null && incomingRequests.isEmpty() && elapsedTime(startTime) < timeoutMillis) {
Util.wait(this, Math.max(1, timeoutMillis - elapsedTime(startTime)));
if (client != null && incomingRequests.isEmpty()) {
wait(timeoutMillis);
}

if (incomingRequests.isEmpty() && client == null) throw new IOException("Connection closed!");
Expand Down Expand Up @@ -183,20 +182,16 @@ public synchronized void sendResponse(WebSocketResponseMessage response) throws
}
}

private synchronized void sendKeepAlive() throws IOException {
private synchronized Future<Pair<Integer, String>> sendKeepAlive() throws IOException {
if (keepAliveSender != null && client != null) {
byte[] message = WebSocketMessage.newBuilder()
.setType(WebSocketMessage.Type.REQUEST)
.setRequest(WebSocketRequestMessage.newBuilder()
.setId(System.currentTimeMillis())
.setPath("/v1/keepalive")
.setVerb("GET")
.build()).build()
.toByteArray();

if (!client.send(ByteString.of(message))) {
throw new IOException("Write failed!");
}
WebSocketRequestMessage request = WebSocketRequestMessage.newBuilder()
.setId(System.currentTimeMillis())
.setPath("/v1/keepalive")
.setVerb("GET")
.build();
return sendRequest(request);
} else {
return null;
}
}

Expand Down Expand Up @@ -249,7 +244,7 @@ public synchronized void onClosed(WebSocket webSocket, int code, String reason)
}

if (keepAliveSender != null) {
keepAliveSender.shutdown();
keepAliveSender.interrupt();
keepAliveSender = null;
}

Expand Down Expand Up @@ -312,23 +307,43 @@ private Pair<SSLSocketFactory, X509TrustManager> createTlsSocketFactory(TrustSto

private class KeepAliveSender extends Thread {

private AtomicBoolean stop = new AtomicBoolean(false);

public void run() {
while (!stop.get()) {
Future future = null;
boolean severed = false;

while (!interrupted()) {
try {
sleepTimer.sleep(TimeUnit.SECONDS.toMillis(KEEPALIVE_TIMEOUT_SECONDS));

Log.w(TAG, "Sending keep alive...");
sendKeepAlive();
} catch (Throwable e) {
Log.w(TAG, e);
if (future != null) {
try {
future.get(0L, TimeUnit.SECONDS);
} catch (ExecutionException | TimeoutException e){
severed = true;
}
}
} catch (InterruptedException e) {
Log.d(TAG, "Keep alive sender interrupted; exiting loop.");
break;
}
}
}

public void shutdown() {
stop.set(true);
if (severed) {
Log.d(TAG, "No response to previous keep-alive; forcing new connection.");

disconnect();
synchronized(WebSocketConnection.this) {
WebSocketConnection.this.notifyAll();
}
} else {
Log.d(TAG, "Sending keep alive...");

try {
future = sendKeepAlive();
} catch (IOException e) {
Log.d(TAG, "Failed to send keep alive: " + e.getMessage());
}
}
}
}
}

Expand Down

0 comments on commit daa5dc9

Please sign in to comment.