-
Notifications
You must be signed in to change notification settings - Fork 14
Ensure observable methods return replay as needed #164
base: main
Are you sure you want to change the base?
Conversation
wesbillman
commented
Jul 16, 2021
- Remove unnecessary usage of expectations in non-async tests
- Remove unnecessary usage of expectations in non-async tests
Feels like these should be made in |
@luispadron: I don't think that would be sufficient, because this issue affects all types that emit a value on subscription ( Variable(10)
.asObservable()
.map { 2 * $0 }
.subscribe(onNext: { print("Doubled: \($0)") }) prints nothing, because by the time the mapped observable is subscribed to, the Variable has already emitted its initial value. All that said, after thinking about this a little more, I wonder if using Replay for all transformations might be going too far in the other direction? E.g. consider this example: let stream = Observable<String>()
let identityStream = stream.map { $0 }
stream.on(.next("Nothing to see here"))
stream.on(.done)
stream.subscribe(onNext: { print("Value: \($0)") }) // prints nothing, as expected, since the stream was already ended
identityStream.subscribe(onNext: { print("Identity: \($0)") }) // prints "Nothing to see here" It seems pretty unintuitive to me that I wonder if a better approach would be to have transformations like extension Observable {
class Mapped<U, O: ObservableType>: Observable<U> {
let source: O
let transform: (O.T) -> U
init(_ source: O, transform: @escaping (O.T) -> U) {
self.source = source
self.transform = transform
}
override func subscribe(queue: DispatchQueue? = nil, onNext: ((U) -> Void)? = nil, onError: ((Error) -> Void)? = nil, onDone: (() -> Void)? = nil) -> Subscriber<U> {
let subscriber = Subscriber(queue: queue, observable: self, handler: createHandler(onNext: onNext, onError: onError, onDone: onDone))
source.subscribe(queue: queue, onNext: {
let u = self.transform($0)
self.notify(subscriber: subscriber, event: .next(u))
}, onError: {
self.notify(subscriber: subscriber, event: .error($0))
}, onDone: {
self.notify(subscriber: subscriber, event: .done)
})
return subscriber
}
}
public func map<U>(_ transform: @escaping (T) -> U) -> Observable<U> {
Mapped(self, transform: transform)
}
} This seems a little more intuitive to me, because it means that a complex stream with multiple transformations wouldn't be kicked off until |
Yeah great thoughts and discussion @chris-lapilla, totally agree that observable shouldn't guarantee anything about a buffer like I think the solution you proposed is the correct way to go about this, this is similar to how combine does this as well, with their many different Publishers. However it would require more work and be a fairly large change, but if we plan to keep supporting snail this should be prioritized, as well as the memory leak issues. Fundamentally, snail just doesn't behave like any standard implementation of reactive streams, and it's a learning experience for the team that we should prefer standard solutions over these. |
We can also look at the implementations for RxSwift and Combine. Which is related to what @chris-lapilla proposed: final private class MapSink<SourceType, Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Transform = (SourceType) throws -> ResultType
typealias ResultType = Observer.Element
typealias Element = SourceType
private let transform: Transform
init(transform: @escaping Transform, observer: Observer, cancel: Cancelable) {
self.transform = transform
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<SourceType>) {
switch event {
case .next(let element):
do {
let mappedElement = try self.transform(element)
self.forwardOn(.next(mappedElement))
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
}
case .error(let error):
self.forwardOn(.error(error))
self.dispose()
case .completed:
self.forwardOn(.completed)
self.dispose()
}
}
} |