concurrency research
Swift 新并发框架之 async/await
将回调改写成 async 函数
提供2组函数一个抛出异常,一个不抛出异常
public func withCheckedContinuation<T>(
function: String = #function,
_ body: (CheckedContinuation<T, Never>) -> Void
) async -> T
public func withCheckedThrowingContinuation<T>(
function: String = #function,
_ body: (CheckedContinuation<T, Error>) -> Void
) async throws -> T
[理解]
await 暂停的是方法,而不是执行方法的线程;
await 暂停点前后可能会发生线程切换。
await 之所以称为『 潜在 』暂停点,而不是暂停点,是因为并不是所有的 await 都会暂停,只有遇到类似 IO、手动起子线程等情况时才会暂停当前调用栈的运行。
如果 await后没有异步操作,会立刻返回结果。
[代码解析]
asyncConnect把connect方法改写成异步函数, transport.connect()调用之后开始真正的暂停,因为transport.connect()是子线程异步操作
let c = try await MSession.default.asyncConnect()
public func connect(completionHandler: @escaping ConnectedCallback) {
guard !isConnected else {
completionHandler(.success(nil))
return
}
guard !isConnecting else {
connectedCallBackList.append(completionHandler)
return
}
connectedCallBackList.append(completionHandler)
transport.connect()
isConnecting = true
}
public func asyncConnect() async throws -> Swift.Result<[String : String]?, MError> {
return try await withCheckedThrowingContinuation { continuation in
connect { result in
continuation.resume(with: .success(result))
}
}
}
Swift 新并发框架之 Task
在程序当中调用异步函数 Task
[理解]
Task 是线程的高级抽象,可以理解为创建一个线程Task,在这个线程上执行异步任务,就是Task的闭包里执行的任务在这个线程上。
如果是在 actor 方法中调用 Task.init 的,则 Task closure 将成为 actor-isolated。Task继承这个actor环境, Task里面异步任务挂起和暂停的地方都在这个actor 线程上
[代码解析]
因为这类是@MainActor,所以Task的环境继承mainactor,异步任务挂起和恢复都在主线程,异步任务恢复之后可以访问mainactor的isolated属性
总结一下,Task在actor内会继承这个actor环境,Task里面的异步任务挂起和恢复都在这个actor线程内,也可以访问这个actor的isolated属性
@MainActor
class RateDataSource: ObservableObject {
weak var delegate : RateModelDelegate? = nil
var storage = Set<AnyCancellable>()
@Published var rates: [QuoteV1] = []
func loadData() {
WebSocketPusher.sharedInstance.quoteSubject.sink { error in
} receiveValue: { [weak self] quotes in
dump("push quotes is \(quotes)")
self?.onQuotesService(quotes: quotes, first: false)
}.store(in: &self.storage)
Task {
// 异步任务挂起的地方在主线程,因为这个类是mainactor
guard let contracts = await MarketRequest().asyncSend().success?.data?.contracts else {
return
}
// 异步任务恢复的地方在主线程,因为这个类是mainactor
let ids = contracts.map({ c in return c.id_p })
let quote = await SubscribeQuoteRequest(ids).asyncSend().success?.data?.quotes
onQuotesService(quotes: quote)
dump("Subscribe quotes is \(String(describing: quote))")
}
}
func onQuotesService(quotes: [QuoteV1]?, first: Bool = true) {
guard let q = quotes else {
return
}
if first {
self.rates = q
print("first rates count is \(rates.count)")
} else {
self.rates.mergeWithOrdering(mergeWith: q, uniquelyKeyedBy: \.contractId)
print("merge 0--- rates count is \(rates.count)")
}
}
func unload() {
storage.removeAll()
}
}
Swift 新并发框架之 actor
GlobalActor 和异步函数的调度
[理解]
Actor 代表一组在并发环境下可以安全访问的(可变)状态;
Actor 通过所谓数据隔离 (Actor isolation) 的方式确保数据安全,其实现原理是 Actor 内部维护了一个串行队列 (mailbox),所有涉及数据安全的外部调用都要入队,即它们都是串行执行的。
[代码解析]
自己实现一个actor这个actor的job都在穿行队列dispatcher执行。
@globalActor actor SerialActor: GlobalActor {
typealias ActorType = SerialActor
static let shared: SerialActor = SerialActor()
private static let _sharedExecutor = SyncExectuor()
static let sharedUnownedExecutor: UnownedSerialExecutor = _sharedExecutor.asUnownedSerialExecutor()
let unownedExecutor: UnownedSerialExecutor = sharedUnownedExecutor
}
final class SyncExectuor: SerialExecutor {
private static let dispatcher: DispatchQueue = DispatchQueue(label: "momiji.session.actior")
func enqueue(_ job: UnownedJob) {
print("enqueue")
SyncExectuor.dispatcher.async {
job._runSynchronously(on: self.asUnownedSerialExecutor())
}
}
func asUnownedSerialExecutor() -> UnownedSerialExecutor {
UnownedSerialExecutor(ordinary: self)
}
}
[理解]
Sendable closure 是不能捕获 actor-isolated 属性,否则报错: Actor-isolated property 'x' can not be referenced from a Sendable closure。
但 Task closure 是个例外,因为它本身也是 actor-isolated,所以下面的代码不会报错:
public actor TestActor {
var value: Int = 0
func testTask() {
Task {
value = 1
}
}
}
Actor 模型 Actor 模型是为了解决多线程访问共享数据造成的数据竞争问题,它将数据隔离在 Actor 外,从外部访问 Actor 数据时(我们称之为隔离域外),只能够异步地访问,并且 Actor 内部有串行执行器来保证访问是串行的。 更多内容可以参考 WWDC21 Session Protect mutable state with Swift actors 和对应的内参文章 【WWDC21 10133】并发编程新利器 - Actor。
Main Actor由于没有派发线程的概念,它可以把耗时任务交给其他普通的 Actor 执行 普通Actor会开辟一个异步线程,线程里面的任务是串行执行,可以参考自定义actor。
更多内容可以参考 WWDC21 Session Protect mutable state with Swift actors 和对应的内参文章 【WWDC21 10133】并发编程新利器 - Actor。A 更多内容可以参考 WWDC21 Session Protect mutable state with Swift actors 和对应的内参文章 【WWDC21 10133】并发编程新利器 - Actor。
Swift Concurrency – Things They Don’t Tell You
Swift 中的 Async/Await ——代码实例详解
Swift AsyncSequence — 代码实例详解
Limit Swift Concurrency's cooperative pool
3 mistakes to avoid with async / await
MainActor usage in Swift explained to dispatch to the main thread
Actors in Swift: how to use and prevent data races
How async/await works internally in Swift
Concurrency Step-by-Step: Reading from Storage
Concurrency Step-by-Step: Reading from Storage