-
-
Notifications
You must be signed in to change notification settings - Fork 57
/
ObservableInterval.java
91 lines (83 loc) · 3.88 KB
/
ObservableInterval.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package rx.observables.creating;
import org.junit.Test;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
/**
* Created by pabloperezgarcia on 10/3/16.
* <p>
* An Observable that emit an item every interval time specify.
*/
public class ObservableInterval {
/**
* Since interval work asynchronously you will have to use TestSubscriber class to wait constantClass period of time
* to see some items emitted. This type of observable never finish to emit, in order to stop, you will need unsubscribe the observer.
*/
@Test
public void testIntervalObservable() {
Subscription subscription = Observable.interval(50, TimeUnit.MILLISECONDS)
.map(time -> "item emitted\n")
.subscribe(System.out::print);
new TestSubscriber((Observer) subscription).awaitTerminalEvent(200, TimeUnit.MILLISECONDS);
System.out.println("Vertx started");
}
@Test
public void testIntervalAsyncSubject() {
Scheduler scheduler = Schedulers.newThread();
AsyncSubject<String> asyncSubject = AsyncSubject.create();
final Subscription subscribe = Observable.interval(50, TimeUnit.MILLISECONDS, scheduler)
.map(time -> "item emitted\n")
.doOnCompleted(() -> System.out.println("all items emitted"))
.subscribe(System.out::println);
asyncSubject.subscribeOn(scheduler).subscribe((Observer<? super String>) subscribe);
new TestSubscriber().awaitTerminalEvent(200, TimeUnit.MILLISECONDS);
}
/**
* Since interval work asynchronously you will have to use TestSubscriber class to wait constantClass period of time
* to see some items emitted. This type of observable never finish since we dont specify constantClass max number of intervals.
* Since we unsubscribe after wait 200ms should stop emitting and we should only see 4 items emitted, just like the previous example
*/
@Test
public void testIntervalObservableWithMax() {
Subscription subscription = Observable.interval(50, TimeUnit.MILLISECONDS)
.map(time -> "item emitted\n")
.subscribe(System.out::print);
TestSubscriber testSubscriber = new TestSubscriber((Observer) subscription);
testSubscriber.awaitTerminalEvent(200, TimeUnit.MILLISECONDS);
subscription.unsubscribe();
testSubscriber.awaitTerminalEvent(200, TimeUnit.MILLISECONDS);
}
/**
* Interval can be used with zipWith which will allow you to zip one element of your observable emission with the next
* after N interval time.
*/
@Test
public void intervalWithZip() {
Observable.interval(500, TimeUnit.MILLISECONDS)
.zipWith(Observable.from(Arrays.asList(1, 2, 3, 4, 5)), (a, b) -> String.valueOf(a).concat("-").concat(String.valueOf(b)))
.subscribe(number -> System.out.println("number:" + number),
System.out::println,
System.out::println);
TestSubscriber testSubscriber = new TestSubscriber();
testSubscriber.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS);
}
@Test
public void testIntervalObservableWithError() {
Observable.interval(100, TimeUnit.MILLISECONDS)
.map(time -> "item\n")
.map(time -> null)
.materialize()
.flatMap(item -> Observable.just(item)
.map(Object::toString)
.onErrorResumeNext(t -> Observable.just("Item error")))
.subscribe(System.out::print);
TestSubscriber testSubscriber = new TestSubscriber();
testSubscriber.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS);
}
}