Skip to content

Commit

Permalink
Merge pull request #34 from techery/fail-on-cancel
Browse files Browse the repository at this point in the history
Status FAIL on cancel
  • Loading branch information
almozavr committed Mar 14, 2016
2 parents fe93e0d + e711309 commit 5f8059b
Show file tree
Hide file tree
Showing 10 changed files with 59 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;

import io.techery.janet.AsyncActionService.QueuePoller.PollCallback;
import io.techery.janet.async.SyncPredicate;
Expand Down Expand Up @@ -51,6 +52,7 @@ final public class AsyncActionService extends ActionService {
private AsyncActionsRosterBase actionsRoster;
private final AsyncActionSynchronizer synchronizer;
private AsyncActionWrapperFactory actionWrapperFactory;
private final List<Object> runningActions;


public AsyncActionService(String url, AsyncClient client, Converter converter) {
Expand All @@ -69,6 +71,7 @@ public AsyncActionService(String url, AsyncClient client, Converter converter) {
this.connectActionQueue = new ConcurrentLinkedQueue<ActionHolder<ConnectAsyncAction>>();
this.disconnectActionQueue = new ConcurrentLinkedQueue<ActionHolder<DisconnectAsyncAction>>();
this.synchronizer = new AsyncActionSynchronizer(waitingErrorCallback);
this.runningActions = new CopyOnWriteArrayList<Object>();
loadActionWrapperFactory();
loadAsyncActionRooster();
client.setCallback(clientCallback);
Expand All @@ -86,27 +89,37 @@ public AsyncActionService(String url, AsyncClient client, Converter converter) {
if (handleConnectionAction(holder)) {
return;
}
AsyncActionWrapper wrapper = actionWrapperFactory.make(holder);
if (wrapper == null) {
throw new JanetInternalException(ERROR_GENERATOR);
}
if (!client.isConnected()) {
connect(false);
try {
runningActions.add(holder.action());
AsyncActionWrapper wrapper = actionWrapperFactory.make(holder);
if (wrapper == null) {
throw new JanetInternalException(ERROR_GENERATOR);
}
if (!client.isConnected()) {
connect(false);
}
sendAction(wrapper);
if (wrapper.getResponseEvent() == null) {
callback.onSuccess(holder);
}
} catch (CancelException ignored) {
} finally {
runningActions.remove(holder.action());
}
sendAction(wrapper);
}

@Override protected <A> void cancel(ActionHolder<A> holder) {
AsyncActionWrapper wrapper = actionWrapperFactory.make(holder);
if (wrapper == null) {
throw new JanetInternalException(ERROR_GENERATOR);
}
runningActions.remove(holder.action());
if (wrapper.getResponseEvent() != null) {
synchronizer.remove(wrapper);
}
}

private void sendAction(AsyncActionWrapper wrapper) throws AsyncServiceException {
private void sendAction(AsyncActionWrapper wrapper) throws AsyncServiceException, CancelException {
String responseEvent = wrapper.getResponseEvent();
if (responseEvent != null) {
synchronizer.put(responseEvent, wrapper);
Expand All @@ -119,11 +132,20 @@ private void sendAction(AsyncActionWrapper wrapper) throws AsyncServiceException
} else {
client.send(wrapper.getEvent(), new String(content));
}
throwIfCanceled(wrapper.action);
} catch (CancelException e) {
throw e;
} catch (Throwable t) {
throw new AsyncServiceException(t);
}
}

private void throwIfCanceled(Object action) throws CancelException {
if (!runningActions.contains(action)) {
throw new CancelException();
}
}

@SuppressWarnings("unchecked")
private <A> boolean handleConnectionAction(ActionHolder<A> holder) throws AsyncServiceException {
A action = holder.action();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ protected long getResponseTimeout() {

AsyncActionWrapper<?> wrapper = (AsyncActionWrapper<?>) o;

if (action == wrapper.action) return true;

return action != null ? action.equals(wrapper.action) : wrapper.action == null;

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ final public class CommandActionService extends ActionService {
}

@SuppressWarnings("unchecked")
@Override protected <A> void sendInternal(ActionHolder<A> holder) throws JanetException {
@Override protected <A> void sendInternal(ActionHolder<A> holder) throws CommandServiceException {
callback.onStart(holder);
CommandActionBase action = checkAndCast(holder.action());
if (action.isCanceled()) {
return;
}
try {
action.run(new ActionProgressInvoker((ActionHolder<CommandActionBase>) holder, callback));
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
* </ul>
* To process response, special annotations can be used:
* <ul>
* <li>{@linkplain Response @Response} for getting response body</li>
* <li>{@linkplain io.techery.janet.http.annotations.Response @Response} for getting response body</li>
* <li>{@linkplain Status @Status} for getting response status. Field types Integer, Long, int or long can be used
* to get status code or use boolean to know that request was sent successfully</li>
* <li>{@linkplain ResponseHeader @ResponseHeader} for getting response headers</li>
Expand Down Expand Up @@ -205,7 +205,4 @@ private static abstract class ActionRequestCallback<A> implements HttpClient.Req

}

private static class CancelException extends Throwable {}


}
13 changes: 1 addition & 12 deletions janet/src/main/java/io/techery/janet/ActionPipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observables.ConnectableObservable;
import rx.subjects.PublishSubject;

/**
* End tool for sending and receiving actions with specific type using RXJava.
Expand All @@ -25,7 +24,6 @@ final public class ActionPipe<A> {
private final Observable<ActionState<A>> pipeline;
private final Action1<A> cancelFunc;
private final Scheduler defaultSubscribeOn;
private final PublishSubject<A> cancelSignal;

private ConnectableObservable<ActionState<A>> cachedPipeline;
private ConnectableObservable<A> cachedSuccessPipeline;
Expand All @@ -38,7 +36,6 @@ final public class ActionPipe<A> {
this.pipeline = pipelineFactory.call();
this.cancelFunc = cancelFunc;
this.defaultSubscribeOn = defaultSubscribeOn;
this.cancelSignal = PublishSubject.create();

createCachedPipeline();
createCachedSuccessPipeline();
Expand Down Expand Up @@ -133,7 +130,6 @@ public void send(A action, Scheduler subscribeOn) {
* @param action prepared action for cancellation
*/
public void cancel(A action) {
cancelSignal.onNext(action);
cancelFunc.call(action);
}

Expand All @@ -144,14 +140,7 @@ public void cancel(A action) {
* @param action prepared action to send
*/
public Observable<ActionState<A>> createObservable(final A action) {
return syncObservableFactory.call(action)
.takeUntil(cancelSignal
.asObservable()
.filter(new Func1<A, Boolean>() {
@Override public Boolean call(A a) {
return a == action;
}
}));
return syncObservableFactory.call(action);
}

/**
Expand Down
4 changes: 2 additions & 2 deletions janet/src/main/java/io/techery/janet/ActionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/**
* Base class that needs to extend to create a new service.
* Service processes Janet operations for supported action type with annotation
* defined in {@link #getSupportedAnnotationType()}
* defined in {@linkplain #getSupportedAnnotationType()}
*/
public abstract class ActionService {

Expand All @@ -29,7 +29,7 @@ final <A> void send(ActionHolder<A> holder) {

/**
* Getting action annotation type for using to create supported action.
* Actions with this annotation will be processed by the {@link ActionService}.
* Actions with this annotation will be processed by the {@linkplain ActionService}.
*/
abstract protected Class getSupportedAnnotationType();

Expand Down
6 changes: 6 additions & 0 deletions janet/src/main/java/io/techery/janet/CancelException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.techery.janet;

public final class CancelException extends JanetException {

public CancelException() {}
}
4 changes: 3 additions & 1 deletion janet/src/main/java/io/techery/janet/Janet.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,10 @@ private <A> void doSend(A action) {
}

private <A> void doCancel(A action) {
ActionHolder holder = ActionHolder.create(action);
pipeline.onNext(new ActionPair(holder, ActionState.fail(action, new CancelException())));
ActionService service = findService(action.getClass());
service.cancel(ActionHolder.create(action));
service.cancel(holder);
}

private ActionService findService(Class actionClass) {
Expand Down
2 changes: 2 additions & 0 deletions janet/src/main/java/io/techery/janet/JanetException.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
*/
public class JanetException extends Throwable {

JanetException() {}

public JanetException(String message, Throwable cause) {
super(message, cause);
}
Expand Down
11 changes: 8 additions & 3 deletions janet/src/test/java/io/techery/janet/JanetTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.techery.janet;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
Expand All @@ -15,11 +16,10 @@

import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.only;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -122,7 +122,12 @@ public void cancelAfterSend() {
}));
actionPipe.createObservable(action).subscribe(subscriber);
subscriber.unsubscribe();
assertSubscriberWithSingleValue(subscriber);
subscriber.assertNoErrors();
subscriber.assertUnsubscribed();
List<ActionState<TestAction>> values = subscriber.getOnNextEvents();
assertStatusCount(values, ActionState.Status.START, 1);
assertStatusCount(values, ActionState.Status.FAIL, 1);
Assert.assertThat(values.get(1).exception, instanceOf(CancelException.class));
verify(service, times(1)).cancel(any(ActionHolder.class));
}

Expand Down

0 comments on commit 5f8059b

Please sign in to comment.