diff --git a/janet-async/async-service/src/main/java/io/techery/janet/AsyncActionService.java b/janet-async/async-service/src/main/java/io/techery/janet/AsyncActionService.java index 23fa800..36a9bac 100644 --- a/janet-async/async-service/src/main/java/io/techery/janet/AsyncActionService.java +++ b/janet-async/async-service/src/main/java/io/techery/janet/AsyncActionService.java @@ -3,6 +3,7 @@ import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; import io.techery.janet.AsyncActionService.QueuePoller.PollCallback; import io.techery.janet.async.SyncPredicate; @@ -51,6 +52,7 @@ final public class AsyncActionService extends ActionService { private AsyncActionsRosterBase actionsRoster; private final AsyncActionSynchronizer synchronizer; private AsyncActionWrapperFactory actionWrapperFactory; + private final List runningActions; public AsyncActionService(String url, AsyncClient client, Converter converter) { @@ -69,6 +71,7 @@ public AsyncActionService(String url, AsyncClient client, Converter converter) { this.connectActionQueue = new ConcurrentLinkedQueue>(); this.disconnectActionQueue = new ConcurrentLinkedQueue>(); this.synchronizer = new AsyncActionSynchronizer(waitingErrorCallback); + this.runningActions = new CopyOnWriteArrayList(); loadActionWrapperFactory(); loadAsyncActionRooster(); client.setCallback(clientCallback); @@ -86,14 +89,23 @@ public AsyncActionService(String url, AsyncClient client, Converter converter) { if (handleConnectionAction(holder)) { return; } - AsyncActionWrapper wrapper = actionWrapperFactory.make(holder); - if (wrapper == null) { - throw new JanetInternalException(ERROR_GENERATOR); - } - if (!client.isConnected()) { - connect(false); + try { + runningActions.add(holder.action()); + AsyncActionWrapper wrapper = actionWrapperFactory.make(holder); + if (wrapper == null) { + throw new JanetInternalException(ERROR_GENERATOR); + } + if (!client.isConnected()) { + connect(false); + } + sendAction(wrapper); + if (wrapper.getResponseEvent() == null) { + callback.onSuccess(holder); + } + } catch (CancelException ignored) { + } finally { + runningActions.remove(holder.action()); } - sendAction(wrapper); } @Override protected void cancel(ActionHolder holder) { @@ -101,12 +113,13 @@ public AsyncActionService(String url, AsyncClient client, Converter converter) { if (wrapper == null) { throw new JanetInternalException(ERROR_GENERATOR); } + runningActions.remove(holder.action()); if (wrapper.getResponseEvent() != null) { synchronizer.remove(wrapper); } } - private void sendAction(AsyncActionWrapper wrapper) throws AsyncServiceException { + private void sendAction(AsyncActionWrapper wrapper) throws AsyncServiceException, CancelException { String responseEvent = wrapper.getResponseEvent(); if (responseEvent != null) { synchronizer.put(responseEvent, wrapper); @@ -119,11 +132,20 @@ private void sendAction(AsyncActionWrapper wrapper) throws AsyncServiceException } else { client.send(wrapper.getEvent(), new String(content)); } + throwIfCanceled(wrapper.action); + } catch (CancelException e) { + throw e; } catch (Throwable t) { throw new AsyncServiceException(t); } } + private void throwIfCanceled(Object action) throws CancelException { + if (!runningActions.contains(action)) { + throw new CancelException(); + } + } + @SuppressWarnings("unchecked") private boolean handleConnectionAction(ActionHolder holder) throws AsyncServiceException { A action = holder.action(); diff --git a/janet-async/async-service/src/main/java/io/techery/janet/AsyncActionWrapper.java b/janet-async/async-service/src/main/java/io/techery/janet/AsyncActionWrapper.java index 4b09cde..a91921d 100644 --- a/janet-async/async-service/src/main/java/io/techery/janet/AsyncActionWrapper.java +++ b/janet-async/async-service/src/main/java/io/techery/janet/AsyncActionWrapper.java @@ -46,6 +46,8 @@ protected long getResponseTimeout() { AsyncActionWrapper wrapper = (AsyncActionWrapper) o; + if (action == wrapper.action) return true; + return action != null ? action.equals(wrapper.action) : wrapper.action == null; } diff --git a/janet-command/command-service/src/main/java/io/techery/janet/CommandActionService.java b/janet-command/command-service/src/main/java/io/techery/janet/CommandActionService.java index cf8d6db..6d26028 100644 --- a/janet-command/command-service/src/main/java/io/techery/janet/CommandActionService.java +++ b/janet-command/command-service/src/main/java/io/techery/janet/CommandActionService.java @@ -16,9 +16,12 @@ final public class CommandActionService extends ActionService { } @SuppressWarnings("unchecked") - @Override protected void sendInternal(ActionHolder holder) throws JanetException { + @Override protected void sendInternal(ActionHolder holder) throws CommandServiceException { callback.onStart(holder); CommandActionBase action = checkAndCast(holder.action()); + if (action.isCanceled()) { + return; + } try { action.run(new ActionProgressInvoker((ActionHolder) holder, callback)); } catch (Throwable t) { diff --git a/janet-http/http-service/src/main/java/io/techery/janet/HttpActionService.java b/janet-http/http-service/src/main/java/io/techery/janet/HttpActionService.java index 1f627f8..174d6cc 100644 --- a/janet-http/http-service/src/main/java/io/techery/janet/HttpActionService.java +++ b/janet-http/http-service/src/main/java/io/techery/janet/HttpActionService.java @@ -41,7 +41,7 @@ * * To process response, special annotations can be used: *