Skip to content

Latest commit

 

History

History
644 lines (621 loc) · 46.4 KB

operators.md

File metadata and controls

644 lines (621 loc) · 46.4 KB
<style> dl { margin-top: 0; margin-bottom: 0; margin-left: 1.5em; } dl#outer>dt { font-weight: bold; font-size: 120%; margin-right: -1em; } dt { font-weight: normal; margin-right: -1.5em; } dl > dt::before { content: "…"; } dl#outer > dt::before { content: ""; } dd::before { content: ": "; } dd.sub::before { content: ""; } dt { float: left; clear: left; } dd { float: left; margin-start: 0; -webkit-margin-start: 0; margin-left: 1.5em; } dd.sub { float: none; margin-left: 0; } dd.a::after { content: " ⓐⓈ"; } dd.b::after { content: " ⓑ"; } dd.c::after { content: " ⓒ"; } dd.m::after { content: " ⓜ"; } dd.s::after { content: " Ⓢ"; } hr { clear: both; } dl dt, dl dd { background: #fff; } dl dl dt, dl dl dd { background: #eef; } dl dl dl dt, dl dl dl dd { background: #ffe; } dl dl dl dl dt, dl dl dl dl dd { background: #efe; } dl dl dl dl dl dt, dl dl dl dl dl dd { background: #fef; } dl dl dl dl dl dl dt, dl dl dl dl dl dl dd { background: #fee; } </style>

This tree can help you find the Observable operator you’re looking for.

I want to create a new Observable
that emits a particular item
just( )
that was returned from a function called at subscribe-time
start( )
anew for each subscriber
toAsync( )
that was returned from an Action called at subscribe-time
fromAction( )
that was returned from a Callable called at subscribe-time
fromCallable( )
that was returned from a Runnable called at subscribe-time
fromRunnable( )
after a specified delay
timer( )
that emits a particular set of 1–10 items
from( )
that obtains its sequence from an Array or Iterable
from( )
by retrieving it from a Future
deferFuture( )
that obtains its sequence from a Future
from( )
with a timeout
from( )
that obtains its sequence from an Action called periodically
runAsync( )
that emits a sequence of items repeatedly
repeat( )
as long as a predicate remains true
whileDo( )
but at least once, no matter what
doWhile( )
from scratch, with custom logic
create( )
for each observer that subscribes
defer( )
that emits a sequence of integers
range( )
at particular intervals of time
interval( )
after a specified delay
timer( )
that completes without emitting items
empty( )
that does nothing at all
never( )
I want to create an Observable by combining other Observables
and emitting all of the items from all of the Observables in whatever order they are received
where the source Observables are passed to the operator as parameters
merge(…)
where the source Observables are found in an Array
merge(sequences)
where the source Observables are found in an Iterable or Observable
merge(sequences)
but I only want to process a certain number of them at once
merge(sequences,maxConcurrent)
but not forwarding any error notifications until all source Observables have terminated
mergeDelayError( )
and emitting all of the items from all of the Observables, one Observable at a time
concat( )
by combining the items from two or more Observables sequentially to come up with new items to emit
whenever each of the Observables has emitted a new item
zip( )
whenever any of the Observables has emitted a new item
combineLatest( )
whenever an item is emitted by one Observable in a window defined by an item emitted by another
join( )
based on an Observable that emits all items that have fallen in such a window
groupJoin( )
by means of Pattern and Plan intermediaries
and/then/when
and emitting the items from only the most-recently emitted of those Observables
switchOnNext( )
and mirroring only one of those Observables (which one depends on a parameter I am passed)
switchCase( )
reducing an Observable that emits many Observables to one that emits as many Observables as I have processes to process them on
parallelMerge(…)
I want emit the items from an Observable after transforming them
one at a time with a function
map( )
by casting them to a particular type
cast( )
by emitting all of the items emitted by corresponding Observables
flatMap( )
combined with the original items by means of a function
mergeMap(collectionSelector,resultSelector)
by emitting all of the items in corresponding Iterables
mergeMapIterable(collectionSelector)
combined with the original items by means of a function
mergeMapIterable(collectionSelector,resultSelector)
based on all of the items that preceded them
scan( )
by combining them sequentially with the items in an Iterable by means of a function
zip(iterable,zipFunction)
by attaching a timestamp to them
timestamp( )
into an indicator of the amount of time that lapsed before the emission of the item
timeInterval( )
I want to shift the items emitted by an Observable forward in time before reemitting them
delay(delay,unit)
with the amount of the shift calculated on a per-item basis
delay(itemDelay)
and the initial subscription to the Observable shifted as well
delay(subscriptionDelay,itemDelay)
I want to transform items and notifications from an Observable into items and reemit them
by emitting all of the items emitted by corresponding Observables
mergeMap( )
by wrapping them in Notification objects
materialize( )
which I can then unwrap again with
dematerialize( )
I want to ignore all items emitted by an Observable and only pass along its completed/error notification
ignoreElements( )
I want to mirror an Observable but prefix items to its sequence
obtained from an Array or Iterable
startWith(values)
obtained from an Observable
startWith(values)
passed as parameters to the operator
startWith(…)
only if its sequence is empty
defaultIfEmpty(…)
I want to collect items from an Observable and reemit them as buffers of items
with a maximum number of items per buffer
buffer(count)
and starting every n items
buffer(count,skip)
each time a second Observable emits an item
buffer(boundary)
with buffers given an initial capacity for efficiency reasons
buffer(boundary,initialCapacity)
where that second Observable is returned from a function I supply
buffer(bufferClosingSelector)
and operates on the emission of a third Observable that opens the buffer
buffer(bufferOpenings,bufferClosingSelector)
at periodic intervals
buffer(timespan,unit)
or when a certain maximum number of items fill the buffer
buffer(timespan,unit,count)
for a certain period of time after the interval begins
buffer(timespan,timeshift,unit)
containing only the last items emitted
that is, the last n items
takeLastBuffer(count)
emitted during a window of time before the Observable completed
takeLastBuffer(count,time,unit)
during a window of time before the Observable completed
takeLastBuffer(time,unit)
I want to split one Observable into multiple Observables
with a maximum number of items per sub-Observable
window(count)
and starting every n items
window(count,skip)
each time a second Observable emits an item
window(boundary)
where that second Observable is returned from a function I supply
window(closingSelector)
and operates on the emission of a third Observable that starts the sub-Observable
window(windowOpenings,closingSelector)
at periodic intervals
window(timespan,unit)
or when a certain maximum number of items have been emitted on the sub-Observable
window(timespan,unit,count)
for a certain period of time after the interval begins
window(timespan,timeshift,unit)
so that similar items end up on the same Observable
groupBy( )
but periodically completing some of those Observables even if the source is not complete
groupByUntil(keySelector,durationSelector)
and transforming the items before emitting them on those Observables
groupByUntil(keySelector,valueSelector,durationSelector)
and then collecting similarly grouped Observables back together again
pivot( )
I want to retrieve from an Observable
the last item emitted before it completed
last( )
or a default item if none were emitted
lastOrDefault( )
that matches a predicate
last(predicate)
or a default item if none did
lastOrDefault(predicate)
the sole item it emitted
or an exception if it did not emit exactly one
single( )
or rather a default item if it did not emit any
singleOrDefault( )
that matches a predicate, or an exception if exactly one did not
single(predicate)
or rather a default item if none did
singleOrDefault(predicate)
the first item it emitted
first( )
or a default item if none were emitted
firstOrDefault( )
that matches a predicate
first(predicate)
or a default item if none did
firstOrDefault(predicate)
I want to reemit only certain items from an Observable
by filtering out those that do not match some predicate
filter( )
by filtering out those that are not of a particular type
ofType( )
that is, only the first item
or notify of an error if the source is empty
first( )
or a default value if the source is empty
firstOrDefault(defaultValue)
that matches a predicate
takeFirst(predicate)
or notify of an error if none do
first(predicate)
or a default value if none do
firstOrDefault(defaultValue,predicate)
that is, only the first items
that is, the first n items
take(num)
that is, items emitted by the source during an initial period of time
take(time,unit)
that is, only the last item
last( )
that meets some predicate
last(predicate)
or a default item if none do
lastOrDefault(predicate)
or a default item the source emits nothing
lastOrDefault( )
that is, only item n
elementAt( )
or a default value if there is no item n
elementAtOrDefault( )
that is, only those items after the first items
that is, after the first n items
skip(num)
or until one of those items matches a predicate
skipWhileWithIndex( )
that is, until one of those items matches a predicate
skipWhile( )
that is, after an initial period of time
skip(time,unit)
that is, after a second Observable emits an item
skipUntil( )
that is, those items except the last items
that is, except the last n items
skipLast( )
or until one of those items matches a predicate
takeWhileWithIndex( )
that is, until one of those items matches a predicate
takeWhile( )
that is, except items emitted during a period of time before the source completes
skipLast(time,unit)
that is, except items emitted after a second Observable emits an item
takeUntil( )
by sampling the Observable periodically
based on a timer
and emitting the most-recently emitted item in the period
sample(time,unit)
and emitting the first-emitted item in the period
throttleFirst(time,unit)
based on emissions from another Observable
and emitting the most-recently emitted item in the period
sample(sampler)
and emitting the first-emitted item in the period
throttleFirst(sampler)
by only emitting items that are not followed by other items within some duration
based on a timer
throttleWithTimeout(time,unit)
based on emissions from another Observable
debounce(debounceSelector)
by suppressing items that are duplicates of already-emitted items
distinct( )
according to a particular function
distinct(keySelector)
if they immediately follow the item they are duplicates of
distinctUntilChanged( )
according to a particular function
distinctUntilChanged(keySelector)
by delaying my subscription to it for some time after it begins emitting items
delaySubscription(delay,unit)
I want to reemit items from an Observable only on condition
that it was the first of a collection of Observables to emit an item
amb( )
that some predicate is true
ifThen( )
I want to evaluate the entire sequence of items emitted by an Observable
and emit a single boolean indicating if all of the items pass some test
all( )
and emit a single boolean indicating if any of the items pass some test
contains( )
and emit a single boolean indicating if the Observable emitted any items
exists( )
and emit a single boolean indicating if the Observable emitted no items
isEmpty( )
and emit a single boolean indicating if the sequence is identical to one emitted by a second Observable
sequenceEqual( )
and emit the average of all of their values
averageType( )
and emit the sum of all of their values
sumType( )
and emit a number indicating how many items were in the sequence
[long]count( )
and emit the item with the maximum value
max( )
according to some value-calculating function
maxBy( )
and emit the item with the minimum value
min( )
according to some value-calculating function
minBy( )
by applying an aggregation function to each item in turn and emitting the result
reduce( )
in the form of a single mutable data structure
collect( )
by applying a function to each item in the sequence, blocking until complete
forEach( )
I want to convert the entire sequence of items emitted by an Observable
into a Future
toFuture( )
into an Iterable
toIterable( )
that returns the most recently item emitted by the Observable
mostRecent( )
only if it has not previously returned that item
latest( )
that returns the next item when it is emitted by the Observable
latest( )
into an Iterator
getIterator( ) or toIterator( )
into a List
toList( )
sorted by some criterion
toSortedList( )
into a Map
toMap( )
that is also an ArrayList
toMultiMap( )
I want an Observable to emit exactly one item
so I want it to notify of an error otherwise
single( )
so I want it to notify of an error if it emits more than one, or a default item if it emits none
singleOrDefault( )
that matches a predicate
so I want it to notify of an error otherwise
single(predicate)
so I want it to notify of an error if it emits more than one, or a default item if it emits none
singleorDefault(predicate)
I want an operator to operate on a particular Scheduler
subscribeOn( )
doing its processing in parallel on multiple threads without making the resulting Observable poorly-behaved
parallel( )
when it notifies Observers
observeOn( )
I want an Observable to invoke a particular action
whenever it emits an item
doOnEach(action)
when it issues a completed notification
doOnCompleted(action)
when it issues an error notification
doOnError(action)
when it issues a completed or error notification
doOnTerminate(action)
after it has issued a completed or error notification
finallyDo(action)
whenever it emits an item or issues a completed/error notification
doOnEach(observer)
I want an Observable that will notify observers of an error
error( )
if a specified period of time elapses without it emitting an item
timeout(time,unit)
I want an Observable to recover gracefully
from a timeout by switching to a backup Observable
timeout(time,unit,fallback)
from an upstream error notification
by switching to a particular backup Observable
onErrorResumeNext(sequence)
but only if the error is an Exception
onExceptionResumeNext( )
by switching to a backup Observable returned from a function that is passed the error
onErrorResumeNext(throwable,function)
and by then continuing to observe the source Observable in spite of the error termination
onErrorFlatMap( )
by emitting a particular item and completing normally
onErrorReturn( )
by attempting to resubscribe to the upstream Observable
retry( )
a certain number of times
retry(count)
so long as a predicate remains true
retry(predicate)
from being potentially unserialized or otherwise poorly-behaved
serialize( )
I want to create a resource that has the same lifespan as the Observable
using( )
I want to subscribe to an Observable and receive a Future that blocks until the Observable completes
forEachFuture( )
I want an Observable that does not start emitting items to subscribers until asked
publish( ) or multicast( )
and then only emits the last item in its sequence
publishLast( )
and then emits the complete sequence, even to those who subscribe after the sequence has begun
replay( )
but I want it to go away once all of its subscribers unsubscribe
refCount( ) or share( )
and then I want to ask it to start
connect( )
I want an Observable to retransmit items to observers who subscribe late
cache( )

ⓐ: this operator is part of the optional async-util package
ⓑ: this operator is part of the BlockingObservable subclass
ⓒ: this operator is part of the optional computation-expressions package
ⓜ: this operator is part of the optional math package
Ⓢ: a variant of this operator allows you to choose a particular Scheduler

I have omitted parameter names from some methods where they are not necessary to distinguish variants of the method. This page was inspired by the RxJS tables (static and instance) created by Paul Taylor.