Skip to content

Commit

Permalink
core: improve AsyncButOrdered
Browse files Browse the repository at this point in the history
Instead of marking the handle as not running by setting the handler's
value in the map to false, we now remove simply the key if there is no
handler running. This also means we no longer need to use a weak hash
map for this.

Also reduce the size of the synchronized blocks, mainly by scheduling
the handler outside of the synchronized(threadActiveMap) block.

Make some code better readable and add some more comments. Also do
start a new handler thread if the task threw.
  • Loading branch information
Flowdalic committed Nov 8, 2019
1 parent a7a298c commit 9d626bf
Showing 1 changed file with 41 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/**
*
* Copyright 2018 Florian Schmaus
* Copyright 2018-2019 Florian Schmaus
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@
*/
package org.jivesoftware.smack;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.WeakHashMap;
Expand Down Expand Up @@ -51,9 +52,17 @@
*/
public class AsyncButOrdered<K> {

/**
* A map with the currently pending runnables for a given key. Note that this is a weak hash map so we do not have
* to take care of removing the keys ourselfs from the map.
*/
private final Map<K, Queue<Runnable>> pendingRunnables = new WeakHashMap<>();

private final Map<K, Boolean> threadActiveMap = new WeakHashMap<>();
/**
* A marker map if there is an active thread for the given key. Holds the responsible handler thread if one is
* active, otherwise the key is non-existend in the map.
*/
private final Map<K, Handler> threadActiveMap = new HashMap<>();

private final Executor executor;

Expand All @@ -65,6 +74,14 @@ public AsyncButOrdered(Executor executor) {
this.executor = executor;
}

private void scheduleHandler(Handler handler) {
if (executor == null) {
AbstractXMPPConnection.asyncGo(handler);
} else {
executor.execute(handler);
}
}

/**
* Invoke the given {@link Runnable} asynchronous but ordered in respect to the given key.
*
Expand All @@ -73,6 +90,7 @@ public AsyncButOrdered(Executor executor) {
* @return true if a new thread was created
*/
public boolean performAsyncButOrdered(K key, Runnable runnable) {
// First check if a key queue already exists, create one if not.
Queue<Runnable> keyQueue;
synchronized (pendingRunnables) {
keyQueue = pendingRunnables.get(key);
Expand All @@ -82,29 +100,27 @@ public boolean performAsyncButOrdered(K key, Runnable runnable) {
}
}

// Then add the task to the queue.
keyQueue.add(runnable);

boolean newHandler;
// Finally check if there is already a handler working on that queue, create one if not.
Handler newlyCreatedHandler = null;
synchronized (threadActiveMap) {
Boolean threadActive = threadActiveMap.get(key);
if (threadActive == null) {
threadActive = false;
threadActiveMap.put(key, threadActive);
}
if (!threadActiveMap.containsKey(key)) {
newlyCreatedHandler = new Handler(keyQueue, key);

newHandler = !threadActive;
if (newHandler) {
Handler handler = new Handler(keyQueue, key);
threadActiveMap.put(key, true);
if (executor == null) {
AbstractXMPPConnection.asyncGo(handler);
} else {
executor.execute(handler);
}
// Mark that there is thread active for the given key. Note that this has to be done before scheduling
// the handler thread.
threadActiveMap.put(key, newlyCreatedHandler);
}
}

return newHandler;
if (newlyCreatedHandler != null) {
scheduleHandler(newlyCreatedHandler);
return true;
}

return false;
}

public Executor asExecutorFor(final K key) {
Expand Down Expand Up @@ -134,19 +150,22 @@ public void run() {
try {
runnable.run();
} catch (Throwable t) {
// The run() method threw, this handler thread is going to terminate because of that. Ensure we note
// that in the map.
// The run() method threw, this handler thread is going to terminate because of that. We create
// a new handler to continue working on the queue while throwing the throwable so that the
// executor can handle it.
Handler newlyCreatedHandler = new Handler(keyQueue, key);
synchronized (threadActiveMap) {
threadActiveMap.put(key, false);
threadActiveMap.put(key, newlyCreatedHandler);
}
scheduleHandler(newlyCreatedHandler);
throw t;
}
}

synchronized (threadActiveMap) {
// If the queue is empty, stop this handler, otherwise continue looping.
if (keyQueue.isEmpty()) {
threadActiveMap.put(key, false);
threadActiveMap.remove(key);
break mainloop;
}
}
Expand Down

0 comments on commit 9d626bf

Please sign in to comment.