Skip to content

Commit

Permalink
fix status spread, add ThenFulfillment.
Browse files Browse the repository at this point in the history
  • Loading branch information
webee committed Jan 10, 2017
1 parent aa8951e commit ec387b4
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 45 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ To install the library add:
maven { url "https://jitpack.io" }
}
dependencies {
compile 'com.github.webee:promise4j:v2.0.1'
compile 'com.github.webee:promise4j:v2.1.0'
}
```
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ apply plugin: 'java'
apply plugin: 'maven'

group 'com.github.webee'
version '2.0.0'
version '2.1.0'

sourceCompatibility = 1.7
targetCompatibility = 1.7
Expand Down
145 changes: 104 additions & 41 deletions src/main/java/com/github/webee/promise/Promise.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

import com.github.webee.promise.functions.Action;
import com.github.webee.promise.functions.Fulfillment;
import com.github.webee.promise.functions.ThenFulfillment;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

/**
* Created by webee on 16/11/17.
Expand All @@ -20,16 +18,35 @@ enum State {
PENDING, FULFILLED, REJECTED
}

private Executor executor;
private Executor transformExecutor;
private ConcurrentLinkedQueue<ExecutableRunnable> handlers = new ConcurrentLinkedQueue<>();
private ConcurrentLinkedQueue<ExecutableRunnable> listeners = new ConcurrentLinkedQueue<>();
static class NullStatus {
static NullStatus instance = new NullStatus();
private NullStatus() {
}

@Override
public String toString() {
return "NullStatus{}";
}
}

// promise state.
private State state = State.PENDING;
private boolean isWaiting = false;

// promise data.
private T value;
private Object status;
private Object status = NullStatus.instance;
private Throwable reason;

// current handler/listener executor.
private Executor executor;
// current transformer executor.
private Executor transformExecutor;
// settled handler callbacks.
private ConcurrentLinkedQueue<ExecutableRunnable> handlers = new ConcurrentLinkedQueue<>();
// status listener callbacks.
private ConcurrentLinkedQueue<ExecutableRunnable> listeners = new ConcurrentLinkedQueue<>();

/**
* 通过实现构造一个Promise
*
Expand Down Expand Up @@ -63,7 +80,7 @@ public void update(Object s) {
}

public Promise(Fulfillment<T> fulfill) {
this(null, fulfill);
this(NullStatus.instance, fulfill);
}

private void settled() {
Expand All @@ -81,21 +98,31 @@ private void updated() {
}
}

private synchronized void listen(Runnable listener, Executor executor) {
executor = executor != null ? executor : PromiseExecutors.defaultExcutor();
private synchronized <V> void listen(final Action<V> action, Executor executor) {
executor = executor != null ? executor : PromiseExecutors.defaultExecutor();

Runnable listener = new Runnable() {
@Override
public void run() {
action.run((V) status);
}
};

if (status != NullStatus.instance) {
executor.execute(listener);
}

executor.execute(listener);
if (state == State.PENDING) {
listeners.add(new ExecutableRunnable(listener, executor));
}
}

private synchronized void listen(Runnable listener) {
listen(listener, PromiseExecutors.defaultExcutor());
private synchronized <V> void listen(Action<V> action) {
listen(action, PromiseExecutors.defaultExecutor());
}

private synchronized void handle(Handler handler, Executor executor) {
executor = executor != null ? executor : PromiseExecutors.defaultExcutor();
executor = executor != null ? executor : PromiseExecutors.defaultExecutor();

if (state == State.PENDING) {
handlers.add(new ExecutableRunnable(handler, executor));
Expand Down Expand Up @@ -126,8 +153,10 @@ private synchronized void _waiting_reject(Throwable r) {

private synchronized void update(Object s) {
if (state == State.PENDING) {
status = s;
updated();
if (s != status) {
status = s;
updated();
}
}
}

Expand Down Expand Up @@ -157,7 +186,12 @@ private synchronized void fulfill(Promise<T> p) {
// 进入等待状态, 防止其它的fulfill或者_reject
isWaiting = true;
try {
p.fulfilled(new Action<T>() {
p.status(PromiseExecutors.syncExecutor(), new Action<Object>() {
@Override
public void run(Object o) {
hp.update(o);
}
}).fulfilled(new Action<T>() {
public void run(T v) {
hp.waiting_fulfill(v);
}
Expand Down Expand Up @@ -245,11 +279,7 @@ public Promise<T> handleOn(Executor executor) {
* @return 当前Promise
*/
public <V> Promise<T> status(Executor executor, final Action<V> onUpdate) {
listen(new Runnable() {
public void run() {
onUpdate.run((V) status);
}
}, executor);
listen(onUpdate, executor);
return this;
}

Expand Down Expand Up @@ -321,6 +351,14 @@ public Promise<T> settled(Runnable onSettled) {
return settled(executor, onSettled);
}


private static <T> void doFulfill(Transition<T> transition, T v) {
if (v instanceof Promise) {
transition.fulfill((Promise<T>) v);
} else {
transition.fulfill(v);
}
}
/**
* 指定转换执行器
*
Expand Down Expand Up @@ -378,21 +416,21 @@ public Promise<T> thenCatch(CatchPromiseTransform<T> catchTransform) {
/**
* 进入下一个计算流程
*
* @param executor 执行器
* @param transform 变换回调
* @param <V> 变换目标类型
* @param executor 执行器
* @param s 初始状态
* @param thenFulfillment 下一步的实现
* @param <V> 变换目标类型
* @return 变换后Promise
*/
public <V> Promise<V> then(final Executor executor, final Transform<T, V> transform) {
return new Promise<>(new Fulfillment<V>() {
public <V> Promise<V> then(final Executor executor, Object s, final ThenFulfillment<T, V>thenFulfillment) {
return new Promise<>(s, new Fulfillment<V>() {
@Override
public void run(final Transition<V> transition) {
handle(new Handler() {
@Override
public void onFulfilled(T v) {
try {
V r = transform.run(v);
doFulfill(transition, r);
thenFulfillment.run(v, transition);
} catch (Throwable e) {
transition.reject(e);
}
Expand All @@ -407,6 +445,39 @@ public void onRejected(Throwable r) {
});
}

public <V> Promise<V> then(Object s, final ThenFulfillment<T, V>thenFulfillment) {
return then(transformExecutor, s, thenFulfillment);
}

public <V> Promise<V> then(final Executor executor, final ThenFulfillment<T, V>thenFulfillment) {
return then(executor, NullStatus.instance, thenFulfillment);
}

public <V> Promise<V> then(final ThenFulfillment<T, V>thenFulfillment) {
return then(NullStatus.instance, thenFulfillment);
}

/**
* 进入下一个计算流程
*
* @param executor 执行器
* @param transform 变换回调
* @param <V> 变换目标类型
* @return 变换后Promise
*/
public <V> Promise<V> then(final Executor executor, final Transform<T, V> transform) {
return then(executor, new ThenFulfillment<T, V>() {
@Override
public void run(T v, Transition<V> transition) {
try {
doFulfill(transition, transform.run(v));
} catch (Throwable e) {
transition.reject(e);
}
}
});
}

public <V> Promise<V> then(final Transform<T, V> transform) {
return then(transformExecutor, transform);
}
Expand All @@ -419,7 +490,7 @@ public <V> Promise<V> then(final Transform<T, V> transform) {
* @param <V> 变换目标值类型
* @return 变换后Promise
*/
public <V> Promise<V> then(Executor executor, PromiseTransform<T, V> transform) {
public <V> Promise<V> then(Executor executor, final PromiseTransform<T, V> transform) {
return then(executor, (Transform<T, V>) transform);
}

Expand Down Expand Up @@ -498,14 +569,6 @@ void execute() {
}
}

private static <T> void doFulfill(Transition<T> transition, T v) {
if (v instanceof Promise) {
transition.fulfill((Promise<T>) v);
} else {
transition.fulfill(v);
}
}

/**
* 生成一个fulfilled值为v的Promise
*
Expand Down Expand Up @@ -560,7 +623,7 @@ public static <T> Promise<Iterable<T>> all(final Iterable<Promise<T>> promises)
return new Promise<>(new Fulfillment<Iterable<T>>() {
@Override
public void run(final Transition<Iterable<T>> transition) {
PromiseExecutors.defaultExcutor().execute(new Runnable() {
PromiseExecutors.defaultExecutor().execute(new Runnable() {
@Override
public void run() {
int count = 0;
Expand Down
13 changes: 12 additions & 1 deletion src/main/java/com/github/webee/promise/PromiseExecutors.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class PromiseExecutors {
private static final AtomicReference<PromiseExecutors> INSTANCE = new AtomicReference<>();

private final Executor defaultExecutor;
private final Executor syncExecutor;

private static PromiseExecutors getInstance() {
for (;;) {
Expand All @@ -28,9 +29,19 @@ private static PromiseExecutors getInstance() {

private PromiseExecutors() {
defaultExecutor = Executors.newCachedThreadPool();
syncExecutor = new Executor() {
@Override
public void execute(Runnable command) {
command.run();
}
};
}

public static Executor defaultExcutor() {
public static Executor defaultExecutor() {
return getInstance().defaultExecutor;
}

public static Executor syncExecutor() {
return getInstance().syncExecutor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.github.webee.promise.functions;

import com.github.webee.promise.Transition;

/**
* Created by webee on 16/11/18.
*/

public interface ThenFulfillment<S, T> {
void run(S val, Transition<T> transition);
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private synchronized void toStop() {

public synchronized void start() {
if (tryRun()) {
PromiseExecutors.defaultExcutor().execute(new Runnable() {
PromiseExecutors.defaultExecutor().execute(new Runnable() {
@Override
public void run() {
do {
Expand Down
Loading

0 comments on commit ec387b4

Please sign in to comment.