Skip to content
This repository has been archived by the owner on Aug 11, 2023. It is now read-only.

Properly synchronize CancellableLoop. #211

Open
wants to merge 2 commits into
base: obsolete/master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 36 additions & 24 deletions rosjava/src/main/java/org/ros/concurrent/CancellableLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,49 +16,45 @@

package org.ros.concurrent;

import com.google.common.base.Preconditions;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;

/**
* An interruptable loop that can be run by an {@link ExecutorService}.
*
*
* @author [email protected] (Keith M. Hughes)
*/
public abstract class CancellableLoop implements Runnable {
private static final Object NOT_STARTED = null;

private final Object mutex;

/**
* {@code true} if the code has been run once, {@code false} otherwise.
*/
private boolean ranOnce = false;
private static final Object FINISHED = CancellableLoop.class;

/**
* The {@link Thread} the code will be running in.
* State of this loop. Possible values are:
* <ul>
* <li>{@link #NOT_STARTED} - corresponds to the 'initial' state, before the loop starts
* <li>{@link Thread} - 'running' state
* <li>{@link #FINISHED} - 'finished' state (this can be an arbitrary object)
* </ul>
*/
private Thread thread;

public CancellableLoop() {
mutex = new Object();
}
private final AtomicReference<Object> state = new AtomicReference<Object>();

@Override
public void run() {
synchronized (mutex) {
Preconditions.checkState(!ranOnce, "CancellableLoops cannot be restarted.");
ranOnce = true;
thread = Thread.currentThread();
Thread currentThread = Thread.currentThread();
if (!state.compareAndSet(NOT_STARTED, currentThread)) {
throw new IllegalStateException("CancellableLoops cannot be restarted.");
}

try {
setup();
while (!thread.isInterrupted()) {
while (!currentThread.isInterrupted()) {
loop();
}
} catch (InterruptedException e) {
handleInterruptedException(e);
} finally {
thread = null;
state.set(FINISHED);
}
}

Expand Down Expand Up @@ -86,15 +82,31 @@ protected void handleInterruptedException(InterruptedException e) {
* Interrupts the loop.
*/
public void cancel() {
if (thread != null) {
thread.interrupt();
for (; ; ) {
Object currentState = state.get();
if (currentState == NOT_STARTED) {
if (state.compareAndSet(NOT_STARTED, FINISHED)) {
// cancelled before starting
return;
} else {
// started before we cancelled, try again
continue;
}
// either finished, or we cancel it
} else if (currentState != FINISHED && state.compareAndSet(currentState, FINISHED)) {
// first to interrupt
Thread runningThread = (Thread) currentState;
runningThread.interrupt();
}
return;
}
}

/**
* @return {@code true} if the loop is running
*/
public boolean isRunning() {
return thread != null && !thread.isInterrupted();
Object currentState = state.get();
return currentState != NOT_STARTED && currentState != FINISHED;
}
}