Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add asyncStream and asyncThrowingStream for Signal and SignalProducer #847

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open
4 changes: 3 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ name: Test
jobs:
test:
name: Test
runs-on: macos-latest
runs-on: macos-11
env:
DEVELOPER_DIR: /Applications/Xcode_13.2.app/Contents/Developer
strategy:
fail-fast: false
matrix:
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

# 6.7.0
# 6.7.0-rc1
1. Add Swift Concurrency extensions `asyncStream` and `asyncThrowingStream` to `Signal` and `SignalProducer` (#847)

1. New operator `SignalProducer.Type.interval(_:interval:on:)` for emitting elements from a given sequence regularly. (#810, kudos to @mluisbrown)

Expand Down
123 changes: 48 additions & 75 deletions ReactiveSwift.xcodeproj/project.pbxproj

Large diffs are not rendered by default.

52 changes: 52 additions & 0 deletions Sources/Signal+SwiftConcurrency.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
// Signal+SwiftConcurrency.swift
// ReactiveSwift
//
// Created by Marco Cancellieri on 2021-11-11.
// Copyright (c) 2021 GitHub. All rights reserved.
//
#if compiler(>=5.5) && canImport(_Concurrency)
import Foundation

@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
extension Signal {
public var asyncThrowingStream: AsyncThrowingStream<Value, Swift.Error> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use AsyncStream<Result<Value, Error>> to maintain the error type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump

AsyncThrowingStream<Value, Swift.Error> { continuation in
let disposable = observe { event in
switch event {
case .value(let value):
continuation.yield(value)
case .completed, .interrupted:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the interrupted event throw a swift CancellationError?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point

continuation.finish()
case .failed(let error):
continuation.finish(throwing: error)
}
}
continuation.onTermination = { @Sendable termination in
disposable?.dispose()
}
}
}
}

@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
extension Signal where Error == Never {
public var asyncStream: AsyncStream<Value> {
AsyncStream<Value> { continuation in
let disposable = observe { event in
switch event {
case .value(let value):
continuation.yield(value)
case .completed, .interrupted:
continuation.finish()
case .failed:
fatalError("Never is impossible to construct")
}
}
continuation.onTermination = { @Sendable termination in
disposable?.dispose()
}
}
}
}
#endif
52 changes: 52 additions & 0 deletions Sources/SignalProducer+SwiftConcurrency.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
//
// SignalProducer+SwiftConcurrency.swift
// ReactiveSwift
//
// Created by Marco Cancellieri on 2021-11-11.
// Copyright (c) 2021 GitHub. All rights reserved.
//
#if compiler(>=5.5) && canImport(_Concurrency)
Marcocanc marked this conversation as resolved.
Show resolved Hide resolved
import Foundation

@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
extension SignalProducer {
public var asyncThrowingStream: AsyncThrowingStream<Value, Swift.Error> {
AsyncThrowingStream<Value, Swift.Error> { continuation in
let disposable = start { event in
switch event {
case .value(let value):
continuation.yield(value)
case .completed, .interrupted:
continuation.finish()
case .failed(let error):
continuation.finish(throwing: error)
}
}
continuation.onTermination = { @Sendable _ in
disposable.dispose()
}
}
}
}

@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
extension SignalProducer where Error == Never {
public var asyncStream: AsyncStream<Value> {
AsyncStream<Value> { continuation in
let disposable = start { event in
switch event {
case .value(let value):
continuation.yield(value)
case .completed, .interrupted:
continuation.finish()
case .failed:
fatalError("Never is impossible to construct")
}
}
continuation.onTermination = { @Sendable _ in
disposable.dispose()
}
}
}
}
#endif
129 changes: 129 additions & 0 deletions Tests/ReactiveSwiftTests/SwiftConcurrencyTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
//
// SwiftConcurrencyTests.swift
// ReactiveSwift
//
// Created by Marco Cancellieri on 2021-11-11.
// Copyright (c) 2021 GitHub. All rights reserved.
//

#if compiler(>=5.5) && canImport(_Concurrency)
import Foundation
import ReactiveSwift
import XCTest

@available(macOS 10.15, iOS 13, watchOS 6, tvOS 13, macCatalyst 13, *)
class SwiftConcurrencyTests: XCTestCase {
func testValuesAsyncSignalProducer() async {
let values = [1,2,3]
var sum = 0
let asyncStream = SignalProducer(values).asyncStream
for await number in asyncStream {
sum += number
}
XCTAssertEqual(sum, 6)
}

func testValuesAsyncThrowingSignalProducer() async throws {
let values = [1,2,3]
var sum = 0
let asyncStream = SignalProducer(values).asyncThrowingStream
for try await number in asyncStream {
sum += number
}
XCTAssertEqual(sum, 6)
}

func testCompleteAsyncSignalProducer() async {
let asyncStream = SignalProducer<String, Never>.empty.asyncStream
let first = await asyncStream.first(where: { _ in true })
XCTAssertEqual(first, nil)
}

func testCompleteAsyncThrowingSignalProducer() async throws {
let asyncStream = SignalProducer<String, Error>.empty.asyncThrowingStream
let first = try await asyncStream.first(where: { _ in true })
XCTAssertEqual(first, nil)
}

func testErrorSignalProducer() async {
let error = NSError(domain: "domain", code: 0, userInfo: nil)
let asyncStream = SignalProducer<String, Error>(error: error).asyncThrowingStream
await XCTAssertThrowsError(try await asyncStream.first(where: { _ in true }))
}

func testValuesAsyncSignal() async {
let signal = Signal<Int, Never> { observer, _ in
DispatchQueue.main.async {
for number in [1, 2, 3] {
observer.send(value: number)
}
observer.sendCompleted()
}
}
var sum = 0
let asyncStream = signal.asyncStream
for await number in asyncStream {
sum += number
}
XCTAssertEqual(sum, 6)
}

func testValuesAsyncThrowingSignal() async throws {
let signal = Signal<Int, Never> { observer, _ in
DispatchQueue.main.async {
for number in [1, 2, 3] {
observer.send(value: number)
}
observer.sendCompleted()
}
}
var sum = 0
let asyncStream = signal.asyncThrowingStream
for try await number in asyncStream {
sum += number
}
XCTAssertEqual(sum, 6)
}

func testCompleteAsyncSignal() async {
let asyncStream = Signal<String, Never>.empty.asyncStream
let first = await asyncStream.first(where: { _ in true })
XCTAssertEqual(first, nil)
}

func testCompleteAsyncThrowingSignal() async throws {
let asyncStream = Signal<String, Error>.empty.asyncThrowingStream
let first = try await asyncStream.first(where: { _ in true })
XCTAssertEqual(first, nil)
}

func testErrorSignal() async {
let error = NSError(domain: "domain", code: 0, userInfo: nil)
let signal = Signal<String, Error> { observer, _ in
DispatchQueue.main.async {
observer.send(error: error)
}
}
let asyncStream = signal.asyncThrowingStream
await XCTAssertThrowsError(try await asyncStream.first(where: { _ in true }))
}
}
// Extension to allow Throw assertion for async expressions
@available(macOS 12, iOS 15, watchOS 8, tvOS 15, macCatalyst 15, *)
Marcocanc marked this conversation as resolved.
Show resolved Hide resolved
fileprivate extension XCTest {
func XCTAssertThrowsError<T: Sendable>(
_ expression: @autoclosure () async throws -> T,
_ message: @autoclosure () -> String = "",
file: StaticString = #filePath,
line: UInt = #line,
_ errorHandler: (_ error: Error) -> Void = { _ in }
) async {
do {
_ = try await expression()
XCTFail(message(), file: file, line: line)
} catch {
errorHandler(error)
}
}
}
#endif