From 09b4cbab760068ed8ce5ce7b9e8909cb8bd97e21 Mon Sep 17 00:00:00 2001 From: nnbb923 Date: Mon, 26 Mar 2018 18:41:25 +0800 Subject: [PATCH 1/3] Add files via upload f --- .../rxjava/share/practices/Practice1.java | 72 ++--- .../rxjava/share/practices/Practice2.java | 100 +++---- .../rxjava/share/practices/Practice3.java | 123 ++++----- .../rxjava/share/practices/Practice4.java | 100 +++---- .../rxjava/share/practices/Practice5.java | 245 ++++++++++-------- 5 files changed, 333 insertions(+), 307 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..31ecd25 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -1,35 +1,37 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - -import cn.nextop.rxjava.share.util.type.Tuple2; -import io.reactivex.Observable; - -/** - * @author Baoyi Chen - */ -public class Practice1 { - - /* - * 举例如下: - * 参数 Observable["a","b","c"] - * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 - */ - public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); - } -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import cn.nextop.rxjava.share.util.type.Tuple2; +import io.reactivex.Observable; + +/** + * @author Baoyi Chen + */ +public class Practice1 { + + /* + * 举例如下: + * 参数 Observable["a","b","c"] + * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 + */ + public Observable> indexable(Observable observable) { + return observable.map(x -> new Tuple2<>(1, x)).scan((a, b) -> { + return new Tuple2<>(a.getV1() + 1, b.getV2()); + }); + } +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..8f9af95 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -1,49 +1,51 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - - -import cn.nextop.rxjava.share.util.type.Tuple2; -import io.reactivex.Observable; -import io.reactivex.Single; - -import java.util.Map; - -/** - * @author Baoyi Chen - */ -public class Practice2 { - - /* - * 举例: - * words = Observable["a", "a", "b", "c", "c"] - * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] - */ - public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * 举例: - * words = Observable["a", "a", "b", "c", "c"] - * 返回: Single[Map{a=2, b=1, c=2}] - */ - public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); - } - -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + + +import java.util.Map; + +import cn.nextop.rxjava.share.util.type.Tuple2; +import io.reactivex.Observable; +import io.reactivex.Single; + +/** + * @author Baoyi Chen + */ +public class Practice2 { + + /* + * 举例: + * words = Observable["a", "a", "b", "c", "c"] + * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] + */ + public Observable> wordCount1(Observable words) { + return words.groupBy(x -> x) + .flatMap(g -> { + return g.count().map(count -> new Tuple2<>(g.getKey(), count.intValue())).toObservable(); + }); + } + + /* + * 举例: + * words = Observable["a", "a", "b", "c", "c"] + * 返回: Single[Map{a=2, b=1, c=2}] + */ + public Single> wordCount2(Observable words) { + return wordCount1(words).toMap(v1 -> v1.getV1(), v2 -> v2.getV2()); + } +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..8dc00bb 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -1,60 +1,63 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - -import io.reactivex.Maybe; -import io.reactivex.Observable; - -/** - * @author Baoyi Chen - */ -public class Practice3 { - - /* - * 根据iterate的结果求和 - */ - public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * 举例: - * 5 - * / \ - * 6 7 - * / \ \ - * 4 3 nil - * - * return Observable[4, 3, 6, 7, 5] 顺序无关 - */ - public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); - } - - public static class Node { - public Node left; - public Node right; - public int value; - - public Node(Node left, Node right, int value) { - this.left = left; - this.right = right; - this.value = value; - } - } - -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import io.reactivex.Maybe; +import io.reactivex.Observable; + +/** + * @author Baoyi Chen + */ +public class Practice3 { + + /* + * 根据iterate的结果求和 + */ + public Maybe sum(Observable observable) { + return iterate(observable).scan((a, b) -> a + b).lastElement(); + } + + /* + * 举例: + * 5 + * / \ + * 6 7 + * / \ \ + * 4 3 nil + * + * return Observable[4, 3, 6, 7, 5] 顺序无关 + */ + public Observable iterate(Observable observable) { + return observable.flatMap(x -> { + Observable o1 = (x.left == null) ? Observable.empty() : iterate(Observable.just(x.left)); + Observable o2 = (x.right == null) ? Observable.empty() : iterate(Observable.just(x.right)); + return Observable.merge(o1, o2, Observable.just(x.value)); + }); + } + + public static class Node { + public Node left; + public Node right; + public int value; + + public Node(Node left, Node right, int value) { + this.left = left; + this.right = right; + this.value = value; + } + } +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..95ce424 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -1,50 +1,50 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - - -import io.reactivex.Observable; - - -/** - * @author Baoyi Chen - */ -public class Practice4 { - - /* - * 举例: - * 参数observable = Observable["a", "b", "c"] - * 参数observer在消费observable时,每个元素都在独立的线程 - * - * thread 1 --------------- - * |-----------| ["a"] | - * | --------------- - * | - * ------------------------- ---------- |thread 2 --------------- - * |Observable["a","b","c"]|----|Observer|----|-----------| ["b"] | - * ------------------------- ---------- | --------------- - * | - * |thread 3 --------------- - * |-----------| ["c"] | - * --------------- - * - */ - public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); - } - -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + + +import io.reactivex.Observable; +import io.reactivex.schedulers.Schedulers; + + +/** + * @author Baoyi Chen + */ +public class Practice4 { + + /* + * 举例: + * 参数observable = Observable["a", "b", "c"] + * 参数observer在消费observable时,每个元素都在独立的线程 + * + * thread 1 --------------- + * |-----------| ["a"] | + * | --------------- + * | + * ------------------------- ---------- |thread 2 --------------- + * |Observable["a","b","c"]|----|Observer|----|-----------| ["b"] | + * ------------------------- ---------- | --------------- + * | + * |thread 3 --------------- + * |-----------| ["c"] | + * --------------- + * + */ + public Observable runInMultiThread(Observable observable) { + return observable.flatMap(x -> Observable.just(x).subscribeOn(Schedulers.newThread())); + } +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..f9e34db 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -1,113 +1,132 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - -import io.reactivex.Maybe; -import io.reactivex.Observable; -import io.reactivex.Single; - -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; - -/** - * @author Baoyi Chen - */ -public class Practice5 { - - /* - * example: - * param: Observable["a","b","c"] - * return: Single[3] - */ - public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable[["a", "b", "c"], ["b", "c", "d"]] - * return: Observable["a", "b", "c","b", "c", "d"] - */ - public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable["a", "a", "b", "b", "c"] - * return: Observable["a", "b", "c"] - */ - public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 - * return: Observable[3, 4] - */ - public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable[1, 2, 3, 4, 5] , index = 2 - * return: Maybe[3] - */ - public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable["a", "b"] , count = 2 - * return: Observable["a", "b", "a", "b"] - */ - public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable["a"], Observable["b"] - * return: Observable["a", "b"] - */ - public Observable concat(List> source) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable["a"], Observable["b"] - * return: Observable["a", "b"] - */ - public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * example: - * param: Observable["a", "b", "c"], 1, SECONDS - * return: Observable["a", "b", "c"], 每个元素都延迟1秒 - */ - public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); - } - -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; + +import io.reactivex.Maybe; +import io.reactivex.Observable; +import io.reactivex.Single; + +/** + * @author Baoyi Chen + */ +public class Practice5 { + + /* + * example: + * param: Observable["a","b","c"] + * return: Single[3] + */ + public Single count(Observable source) { + AtomicLong i = new AtomicLong(0L); + return source.flatMap(x -> { + return Observable.just(i.incrementAndGet()); + }).lastOrError(); + } + + /* + * example: + * param: Observable[["a", "b", "c"], ["b", "c", "d"]] + * return: Observable["a", "b", "c","b", "c", "d"] + */ + public Observable convert(Observable> source) { +// return source.flatMap(list -> Observable.fromIterable(list)); + return source.flatMap(Observable::fromIterable); +// return source.flatMapIterable(x -> x);//rx java2.0 + } + + /* + * example: + * param: Observable["a", "a", "b", "b", "c"] + * return: Observable["a", "b", "c"] + */ + public Observable distinct(Observable source) { + List list = new ArrayList<>(); + return source.concatMap(x -> { + if(!list.contains(x)) { + list.add(x); return Observable.just(x); + } + else return Observable.empty(); + }); + } + + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 + * return: Observable[3, 4] + */ + public Observable filter(Observable source, Predicate conditon) { + return source.concatMap(x -> { + if(conditon.test(x)) return Observable.just(x); else return Observable.empty(); + }); + } + + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , index = 2 + * return: Maybe[3] + */ + public Maybe elementAt(Observable source, int index) { + AtomicInteger i = new AtomicInteger(0); + return source.flatMap(x -> { + if(i.getAndIncrement() == index) return Observable.just(x); + else { return Observable.empty(); } + }).firstElement(); + } + + /* + * example: + * param: Observable["a", "b"] , count = 2 + * return: Observable["a", "b", "a", "b"] + */ + public Observable repeat(Observable source, int count) { + return Observable.range(0, count).concatMap(x -> source); + } + + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable concat(List> source) { + return Observable.fromIterable(source).concatMap(x -> x); + } + + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable merge(List> source) { + return Observable.fromIterable(source).flatMap(x -> x); + } + + /* + * example: + * param: Observable["a", "b", "c"], 1, SECONDS + * return: Observable["a", "b", "c"], 每个元素都延迟1秒 + */ + public Observable delayAll(Observable source, long delay, TimeUnit unit) { + return source.concatMap(x ->Observable.just(x).delay(delay, unit)); + } +} From 05b276f4fc746359cccf7622252564068a9a5030 Mon Sep 17 00:00:00 2001 From: nnbb923 Date: Tue, 27 Mar 2018 17:43:45 +0800 Subject: [PATCH 2/3] fixed --- Practice5.java | 114 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 Practice5.java diff --git a/Practice5.java b/Practice5.java new file mode 100644 index 0000000..1e09692 --- /dev/null +++ b/Practice5.java @@ -0,0 +1,114 @@ +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; + +import io.reactivex.Maybe; +import io.reactivex.Observable; +import io.reactivex.Single; + +/** + * @author Baoyi Chen + */ +public class Practice5 { + + /* + * example: + * param: Observable["a","b","c"] + * return: Single[3] + */ + public Single count(Observable source) { + return source.reduce(0L, (r, s) -> r + 1); + } + + /* + * example: + * param: Observable[["a", "b", "c"], ["b", "c", "d"]] + * return: Observable["a", "b", "c","b", "c", "d"] + */ + public Observable convert(Observable> source) { + return source.flatMap(Observable::fromIterable); + } + + /* + * example: + * param: Observable["a", "a", "b", "b", "c"] + * return: Observable["a", "b", "c"] + */ + public Observable distinct(Observable source) { + return source.groupBy(g->g).map(x->x.getKey()); + } + + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 + * return: Observable[3, 4] + */ + public Observable filter(Observable source, Predicate conditon) { + return source.concatMap(x -> { + if(conditon.test(x)) return Observable.just(x); else return Observable.empty(); + }); + } + + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , index = 2 + * return: Maybe[3] + */ + public Maybe elementAt(Observable source, int index) { + return source.skip(index).take(1).firstElement(); + } + + /* + * example: + * param: Observable["a", "b"] , count = 2 + * return: Observable["a", "b", "a", "b"] + */ + public Observable repeat(Observable source, int count) { + return Observable.range(0, count).concatMap(x -> source); + } + + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable concat(List> source) { + return Observable.fromIterable(source).concatMap(x -> x); + } + + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable merge(List> source) { + return Observable.fromIterable(source).flatMap(x -> x); + } + + /* + * example: + * param: Observable["a", "b", "c"], 1, SECONDS + * return: Observable["a", "b", "c"], 每个元素都延迟1秒 + */ + public Observable delayAll(Observable source, long delay, TimeUnit unit) { + return source.concatMap(x ->Observable.just(x).delay(delay, unit)); + } +} From 70fe503c753bd6b4d42a47e22b18d68bd90429ce Mon Sep 17 00:00:00 2001 From: wanlq Date: Tue, 27 Mar 2018 21:38:25 +0800 Subject: [PATCH 3/3] v2 --- Practice5.java | 228 ++++++++-------- .../rxjava/share/practices/Practice1.java | 74 +++--- .../rxjava/share/practices/Practice2.java | 102 ++++---- .../rxjava/share/practices/Practice3.java | 126 ++++----- .../rxjava/share/practices/Practice4.java | 100 +++---- .../rxjava/share/practices/Practice5.java | 246 ++++++++---------- 6 files changed, 429 insertions(+), 447 deletions(-) diff --git a/Practice5.java b/Practice5.java index 1e09692..539bb04 100644 --- a/Practice5.java +++ b/Practice5.java @@ -1,114 +1,114 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; - -import io.reactivex.Maybe; -import io.reactivex.Observable; -import io.reactivex.Single; - -/** - * @author Baoyi Chen - */ -public class Practice5 { - - /* - * example: - * param: Observable["a","b","c"] - * return: Single[3] - */ - public Single count(Observable source) { - return source.reduce(0L, (r, s) -> r + 1); - } - - /* - * example: - * param: Observable[["a", "b", "c"], ["b", "c", "d"]] - * return: Observable["a", "b", "c","b", "c", "d"] - */ - public Observable convert(Observable> source) { - return source.flatMap(Observable::fromIterable); - } - - /* - * example: - * param: Observable["a", "a", "b", "b", "c"] - * return: Observable["a", "b", "c"] - */ - public Observable distinct(Observable source) { - return source.groupBy(g->g).map(x->x.getKey()); - } - - /* - * example: - * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 - * return: Observable[3, 4] - */ - public Observable filter(Observable source, Predicate conditon) { - return source.concatMap(x -> { - if(conditon.test(x)) return Observable.just(x); else return Observable.empty(); - }); - } - - /* - * example: - * param: Observable[1, 2, 3, 4, 5] , index = 2 - * return: Maybe[3] - */ - public Maybe elementAt(Observable source, int index) { - return source.skip(index).take(1).firstElement(); - } - - /* - * example: - * param: Observable["a", "b"] , count = 2 - * return: Observable["a", "b", "a", "b"] - */ - public Observable repeat(Observable source, int count) { - return Observable.range(0, count).concatMap(x -> source); - } - - /* - * example: - * param: Observable["a"], Observable["b"] - * return: Observable["a", "b"] - */ - public Observable concat(List> source) { - return Observable.fromIterable(source).concatMap(x -> x); - } - - /* - * example: - * param: Observable["a"], Observable["b"] - * return: Observable["a", "b"] - */ - public Observable merge(List> source) { - return Observable.fromIterable(source).flatMap(x -> x); - } - - /* - * example: - * param: Observable["a", "b", "c"], 1, SECONDS - * return: Observable["a", "b", "c"], 每个元素都延迟1秒 - */ - public Observable delayAll(Observable source, long delay, TimeUnit unit) { - return source.concatMap(x ->Observable.just(x).delay(delay, unit)); - } -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; + +import io.reactivex.Maybe; +import io.reactivex.Observable; +import io.reactivex.Single; + +/** + * @author Baoyi Chen + */ +public class Practice5 { + + /* + * example: + * param: Observable["a","b","c"] + * return: Single[3] + */ + public Single count(Observable source) { + return source.reduce(0L, (r, s) -> r + 1); + } + + /* + * example: + * param: Observable[["a", "b", "c"], ["b", "c", "d"]] + * return: Observable["a", "b", "c","b", "c", "d"] + */ + public Observable convert(Observable> source) { + return source.flatMap(Observable::fromIterable); + } + + /* + * example: + * param: Observable["a", "a", "b", "b", "c"] + * return: Observable["a", "b", "c"] + */ + public Observable distinct(Observable source) { + return source.groupBy(g->g).map(x->x.getKey()); + } + + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 + * return: Observable[3, 4] + */ + public Observable filter(Observable source, Predicate conditon) { + return source.concatMap(x -> { + if(conditon.test(x)) return Observable.just(x); else return Observable.empty(); + }); + } + + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , index = 2 + * return: Maybe[3] + */ + public Maybe elementAt(Observable source, int index) { + return source.skip(index).take(1).firstElement(); + } + + /* + * example: + * param: Observable["a", "b"] , count = 2 + * return: Observable["a", "b", "a", "b"] + */ + public Observable repeat(Observable source, int count) { + return Observable.range(0, count).concatMap(x -> source); + } + + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable concat(List> source) { + return Observable.fromIterable(source).concatMap(x -> x); + } + + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable merge(List> source) { + return Observable.fromIterable(source).flatMap(x -> x); + } + + /* + * example: + * param: Observable["a", "b", "c"], 1, SECONDS + * return: Observable["a", "b", "c"], 每个元素都延迟1秒 + */ + public Observable delayAll(Observable source, long delay, TimeUnit unit) { + return source.concatMap(x ->Observable.just(x).delay(delay, unit)); + } +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index 31ecd25..3c2a7be 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -1,37 +1,37 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - -import cn.nextop.rxjava.share.util.type.Tuple2; -import io.reactivex.Observable; - -/** - * @author Baoyi Chen - */ -public class Practice1 { - - /* - * 举例如下: - * 参数 Observable["a","b","c"] - * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 - */ - public Observable> indexable(Observable observable) { - return observable.map(x -> new Tuple2<>(1, x)).scan((a, b) -> { - return new Tuple2<>(a.getV1() + 1, b.getV2()); - }); - } -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import cn.nextop.rxjava.share.util.type.Tuple2; +import io.reactivex.Observable; + +/** + * @author Baoyi Chen + */ +public class Practice1 { + + /* + * 举例如下: + * 参数 Observable["a","b","c"] + * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 + */ + public Observable> indexable(Observable observable) { + return observable.map(x -> new Tuple2<>(1, x)).scan((a, b) -> { + return new Tuple2<>(a.getV1() + 1, b.getV2()); + }); + } +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 8f9af95..1045764 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -1,51 +1,51 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - - -import java.util.Map; - -import cn.nextop.rxjava.share.util.type.Tuple2; -import io.reactivex.Observable; -import io.reactivex.Single; - -/** - * @author Baoyi Chen - */ -public class Practice2 { - - /* - * 举例: - * words = Observable["a", "a", "b", "c", "c"] - * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] - */ - public Observable> wordCount1(Observable words) { - return words.groupBy(x -> x) - .flatMap(g -> { - return g.count().map(count -> new Tuple2<>(g.getKey(), count.intValue())).toObservable(); - }); - } - - /* - * 举例: - * words = Observable["a", "a", "b", "c", "c"] - * 返回: Single[Map{a=2, b=1, c=2}] - */ - public Single> wordCount2(Observable words) { - return wordCount1(words).toMap(v1 -> v1.getV1(), v2 -> v2.getV2()); - } -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + + +import java.util.Map; + +import cn.nextop.rxjava.share.util.type.Tuple2; +import io.reactivex.Observable; +import io.reactivex.Single; + +/** + * @author Baoyi Chen + */ +public class Practice2 { + + /* + * 举例: + * words = Observable["a", "a", "b", "c", "c"] + * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] + */ + public Observable> wordCount1(Observable words) { + return words.groupBy(x -> x) + .flatMap(g -> { + return g.count().map(count -> new Tuple2<>(g.getKey(), count.intValue())).toObservable(); + }); + } + + /* + * 举例: + * words = Observable["a", "a", "b", "c", "c"] + * 返回: Single[Map{a=2, b=1, c=2}] + */ + public Single> wordCount2(Observable words) { + return wordCount1(words).toMap(v1 -> v1.getV1(), v2 -> v2.getV2()); + } +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index 8dc00bb..6b21a76 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -1,63 +1,63 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - -import io.reactivex.Maybe; -import io.reactivex.Observable; - -/** - * @author Baoyi Chen - */ -public class Practice3 { - - /* - * 根据iterate的结果求和 - */ - public Maybe sum(Observable observable) { - return iterate(observable).scan((a, b) -> a + b).lastElement(); - } - - /* - * 举例: - * 5 - * / \ - * 6 7 - * / \ \ - * 4 3 nil - * - * return Observable[4, 3, 6, 7, 5] 顺序无关 - */ - public Observable iterate(Observable observable) { - return observable.flatMap(x -> { - Observable o1 = (x.left == null) ? Observable.empty() : iterate(Observable.just(x.left)); - Observable o2 = (x.right == null) ? Observable.empty() : iterate(Observable.just(x.right)); - return Observable.merge(o1, o2, Observable.just(x.value)); - }); - } - - public static class Node { - public Node left; - public Node right; - public int value; - - public Node(Node left, Node right, int value) { - this.left = left; - this.right = right; - this.value = value; - } - } -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import io.reactivex.Maybe; +import io.reactivex.Observable; + +/** + * @author Baoyi Chen + */ +public class Practice3 { + + /* + * 根据iterate的结果求和 + */ + public Maybe sum(Observable observable) { + return iterate(observable).scan((a, b) -> a + b).lastElement(); + } + + /* + * 举例: + * 5 + * / \ + * 6 7 + * / \ \ + * 4 3 nil + * + * return Observable[4, 3, 6, 7, 5] 顺序无关 + */ + public Observable iterate(Observable observable) { + return observable.flatMap(x -> { + Observable o1 = (x.left == null) ? Observable.empty() : iterate(Observable.just(x.left)); + Observable o2 = (x.right == null) ? Observable.empty() : iterate(Observable.just(x.right)); + return Observable.merge(o1, o2, Observable.just(x.value)); + }); + } + + public static class Node { + public Node left; + public Node right; + public int value; + + public Node(Node left, Node right, int value) { + this.left = left; + this.right = right; + this.value = value; + } + } +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 95ce424..0653b1c 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -1,50 +1,50 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - - -import io.reactivex.Observable; -import io.reactivex.schedulers.Schedulers; - - -/** - * @author Baoyi Chen - */ -public class Practice4 { - - /* - * 举例: - * 参数observable = Observable["a", "b", "c"] - * 参数observer在消费observable时,每个元素都在独立的线程 - * - * thread 1 --------------- - * |-----------| ["a"] | - * | --------------- - * | - * ------------------------- ---------- |thread 2 --------------- - * |Observable["a","b","c"]|----|Observer|----|-----------| ["b"] | - * ------------------------- ---------- | --------------- - * | - * |thread 3 --------------- - * |-----------| ["c"] | - * --------------- - * - */ - public Observable runInMultiThread(Observable observable) { - return observable.flatMap(x -> Observable.just(x).subscribeOn(Schedulers.newThread())); - } -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + + +import io.reactivex.Observable; +import io.reactivex.schedulers.Schedulers; + + +/** + * @author Baoyi Chen + */ +public class Practice4 { + + /* + * 举例: + * 参数observable = Observable["a", "b", "c"] + * 参数observer在消费observable时,每个元素都在独立的线程 + * + * thread 1 --------------- + * |-----------| ["a"] | + * | --------------- + * | + * ------------------------- ---------- |thread 2 --------------- + * |Observable["a","b","c"]|----|Observer|----|-----------| ["b"] | + * ------------------------- ---------- | --------------- + * | + * |thread 3 --------------- + * |-----------| ["c"] | + * --------------- + * + */ + public Observable runInMultiThread(Observable observable) { + return observable.flatMap(x -> Observable.just(x).subscribeOn(Schedulers.newThread())); + } +} diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index f9e34db..0bd9a43 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -1,132 +1,114 @@ -/* - * Copyright 2016-2017 Leon Chen - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.nextop.rxjava.share.practices; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Predicate; - -import io.reactivex.Maybe; -import io.reactivex.Observable; -import io.reactivex.Single; - -/** - * @author Baoyi Chen - */ -public class Practice5 { - - /* - * example: - * param: Observable["a","b","c"] - * return: Single[3] - */ - public Single count(Observable source) { - AtomicLong i = new AtomicLong(0L); - return source.flatMap(x -> { - return Observable.just(i.incrementAndGet()); - }).lastOrError(); - } - - /* - * example: - * param: Observable[["a", "b", "c"], ["b", "c", "d"]] - * return: Observable["a", "b", "c","b", "c", "d"] - */ - public Observable convert(Observable> source) { -// return source.flatMap(list -> Observable.fromIterable(list)); - return source.flatMap(Observable::fromIterable); -// return source.flatMapIterable(x -> x);//rx java2.0 - } - - /* - * example: - * param: Observable["a", "a", "b", "b", "c"] - * return: Observable["a", "b", "c"] - */ - public Observable distinct(Observable source) { - List list = new ArrayList<>(); - return source.concatMap(x -> { - if(!list.contains(x)) { - list.add(x); return Observable.just(x); - } - else return Observable.empty(); - }); - } - - /* - * example: - * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 - * return: Observable[3, 4] - */ - public Observable filter(Observable source, Predicate conditon) { - return source.concatMap(x -> { - if(conditon.test(x)) return Observable.just(x); else return Observable.empty(); - }); - } - - /* - * example: - * param: Observable[1, 2, 3, 4, 5] , index = 2 - * return: Maybe[3] - */ - public Maybe elementAt(Observable source, int index) { - AtomicInteger i = new AtomicInteger(0); - return source.flatMap(x -> { - if(i.getAndIncrement() == index) return Observable.just(x); - else { return Observable.empty(); } - }).firstElement(); - } - - /* - * example: - * param: Observable["a", "b"] , count = 2 - * return: Observable["a", "b", "a", "b"] - */ - public Observable repeat(Observable source, int count) { - return Observable.range(0, count).concatMap(x -> source); - } - - /* - * example: - * param: Observable["a"], Observable["b"] - * return: Observable["a", "b"] - */ - public Observable concat(List> source) { - return Observable.fromIterable(source).concatMap(x -> x); - } - - /* - * example: - * param: Observable["a"], Observable["b"] - * return: Observable["a", "b"] - */ - public Observable merge(List> source) { - return Observable.fromIterable(source).flatMap(x -> x); - } - - /* - * example: - * param: Observable["a", "b", "c"], 1, SECONDS - * return: Observable["a", "b", "c"], 每个元素都延迟1秒 - */ - public Observable delayAll(Observable source, long delay, TimeUnit unit) { - return source.concatMap(x ->Observable.just(x).delay(delay, unit)); - } -} +/* + * Copyright 2016-2017 Leon Chen + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.nextop.rxjava.share.practices; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; + +import io.reactivex.Maybe; +import io.reactivex.Observable; +import io.reactivex.Single; + +/** + * @author Baoyi Chen + */ +public class Practice5 { + + /* + * example: + * param: Observable["a","b","c"] + * return: Single[3] + */ + public Single count(Observable source) { + return source.reduce(0L, (r, s) -> r + 1); + } + + /* + * example: + * param: Observable[["a", "b", "c"], ["b", "c", "d"]] + * return: Observable["a", "b", "c","b", "c", "d"] + */ + public Observable convert(Observable> source) { + return source.flatMap(Observable::fromIterable); + } + + /* + * example: + * param: Observable["a", "a", "b", "b", "c"] + * return: Observable["a", "b", "c"] + */ + public Observable distinct(Observable source) { + return source.groupBy(g -> g).map(x -> x.getKey()); + } + + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 + * return: Observable[3, 4] + */ + public Observable filter(Observable source, Predicate conditon) { + return source.concatMap(x -> { + if(conditon.test(x)) return Observable.just(x); else return Observable.empty(); + }); + } + + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , index = 2 + * return: Maybe[3] + */ + public Maybe elementAt(Observable source, int index) { + return source.skip(index).firstElement(); + } + + /* + * example: + * param: Observable["a", "b"] , count = 2 + * return: Observable["a", "b", "a", "b"] + */ + public Observable repeat(Observable source, int count) { + return Observable.range(0, count).concatMap(x -> source); + } + + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable concat(List> source) { + return Observable.fromIterable(source).concatMap(x -> x); + } + + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable merge(List> source) { + return Observable.fromIterable(source).flatMap(x -> x); + } + + /* + * example: + * param: Observable["a", "b", "c"], 1, SECONDS + * return: Observable["a", "b", "c"], 每个元素都延迟1秒 + */ + public Observable delayAll(Observable source, long delay, TimeUnit unit) { + return source.concatMap(x ->Observable.just(x).delay(delay, unit)); + } +}