Skip to content

Commit

Permalink
Merge pull request #252 from ejensen/thread-safety
Browse files Browse the repository at this point in the history
Prevent crashes caused by threading issues
  • Loading branch information
dsrees authored Jan 16, 2024
2 parents 374f9c3 + 51f4647 commit edfb5e3
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 40 deletions.
10 changes: 6 additions & 4 deletions Sources/SwiftPhoenixClient/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class Channel {
var state: ChannelState

/// Collection of event bindings
var syncBindingsDel: SynchronizedArray<Binding>
let syncBindingsDel: SynchronizedArray<Binding>

/// Tracks event binding ref counters
var bindingRef: Int
Expand Down Expand Up @@ -572,9 +572,11 @@ public class Channel {
func trigger(_ message: Message) {
let handledMessage = self.onMessage(message)

self.syncBindingsDel
.filter( { return $0.event == message.event } )
.forEach( { $0.callback.call(handledMessage) } )
self.syncBindingsDel.forEach { binding in
if binding.event == message.event {
binding.callback.call(handledMessage)
}
}
}

/// Triggers an event to the correct event bindings created by
Expand Down
44 changes: 22 additions & 22 deletions Sources/SwiftPhoenixClient/Socket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ public typealias PayloadClosure = () -> Payload?

/// Struct that gathers callbacks assigned to the Socket
struct StateChangeCallbacks {
var open: SynchronizedArray<(ref: String, callback: Delegated<URLResponse?, Void>)> = .init()
var close: SynchronizedArray<(ref: String, callback: Delegated<(Int, String?), Void>)> = .init()
var error: SynchronizedArray<(ref: String, callback: Delegated<(Error, URLResponse?), Void>)> = .init()
var message: SynchronizedArray<(ref: String, callback: Delegated<Message, Void>)> = .init()
let open: SynchronizedArray<(ref: String, callback: Delegated<URLResponse?, Void>)> = .init()
let close: SynchronizedArray<(ref: String, callback: Delegated<(Int, String?), Void>)> = .init()
let error: SynchronizedArray<(ref: String, callback: Delegated<(Error, URLResponse?), Void>)> = .init()
let message: SynchronizedArray<(ref: String, callback: Delegated<Message, Void>)> = .init()
}


Expand Down Expand Up @@ -140,14 +140,14 @@ public class Socket: PhoenixTransportDelegate {
// MARK: - Private Attributes
//----------------------------------------------------------------------
/// Callbacks for socket state changes
var stateChangeCallbacks: StateChangeCallbacks = StateChangeCallbacks()
let stateChangeCallbacks: StateChangeCallbacks = StateChangeCallbacks()

/// Collection on channels created for the Socket
public internal(set) var channels: [Channel] = []

/// Buffers messages that need to be sent once the socket has connected. It is an array
/// of tuples, with the ref of the message to send and the callback that will send the message.
var sendBuffer: [(ref: String?, callback: () throws -> ())] = []
let sendBuffer = SynchronizedArray<(ref: String?, callback: () throws -> ())>()

/// Ref counter for messages
var ref: UInt64 = UInt64.min // 0 (max: 18,446,744,073,709,551,615)
Expand Down Expand Up @@ -337,7 +337,7 @@ public class Socket: PhoenixTransportDelegate {
var delegated = Delegated<URLResponse?, Void>()
delegated.manuallyDelegate(with: callback)

return self.append(callback: delegated, to: &self.stateChangeCallbacks.open)
return self.append(callback: delegated, to: self.stateChangeCallbacks.open)
}

/// Registers callbacks for connection open events. Automatically handles
Expand Down Expand Up @@ -374,7 +374,7 @@ public class Socket: PhoenixTransportDelegate {
var delegated = Delegated<URLResponse?, Void>()
delegated.delegate(to: owner, with: callback)

return self.append(callback: delegated, to: &self.stateChangeCallbacks.open)
return self.append(callback: delegated, to: self.stateChangeCallbacks.open)
}

/// Registers callbacks for connection close events. Does not handle retain
Expand Down Expand Up @@ -407,7 +407,7 @@ public class Socket: PhoenixTransportDelegate {
var delegated = Delegated<(Int, String?), Void>()
delegated.manuallyDelegate(with: callback)

return self.append(callback: delegated, to: &self.stateChangeCallbacks.close)
return self.append(callback: delegated, to: self.stateChangeCallbacks.close)
}

/// Registers callbacks for connection close events. Automatically handles
Expand Down Expand Up @@ -444,7 +444,7 @@ public class Socket: PhoenixTransportDelegate {
var delegated = Delegated<(Int, String?), Void>()
delegated.delegate(to: owner, with: callback)

return self.append(callback: delegated, to: &self.stateChangeCallbacks.close)
return self.append(callback: delegated, to: self.stateChangeCallbacks.close)
}

/// Registers callbacks for connection error events. Does not handle retain
Expand All @@ -462,7 +462,7 @@ public class Socket: PhoenixTransportDelegate {
var delegated = Delegated<(Error, URLResponse?), Void>()
delegated.manuallyDelegate(with: callback)

return self.append(callback: delegated, to: &self.stateChangeCallbacks.error)
return self.append(callback: delegated, to: self.stateChangeCallbacks.error)
}

/// Registers callbacks for connection error events. Automatically handles
Expand All @@ -482,7 +482,7 @@ public class Socket: PhoenixTransportDelegate {
var delegated = Delegated<(Error, URLResponse?), Void>()
delegated.delegate(to: owner, with: callback)

return self.append(callback: delegated, to: &self.stateChangeCallbacks.error)
return self.append(callback: delegated, to: self.stateChangeCallbacks.error)
}

/// Registers callbacks for connection message events. Does not handle
Expand All @@ -501,7 +501,7 @@ public class Socket: PhoenixTransportDelegate {
var delegated = Delegated<Message, Void>()
delegated.manuallyDelegate(with: callback)

return self.append(callback: delegated, to: &self.stateChangeCallbacks.message)
return self.append(callback: delegated, to: self.stateChangeCallbacks.message)
}

/// Registers callbacks for connection message events. Automatically handles
Expand All @@ -521,10 +521,10 @@ public class Socket: PhoenixTransportDelegate {
var delegated = Delegated<Message, Void>()
delegated.delegate(to: owner, with: callback)

return self.append(callback: delegated, to: &self.stateChangeCallbacks.message)
return self.append(callback: delegated, to: self.stateChangeCallbacks.message)
}

private func append<T>(callback: T, to array: inout SynchronizedArray<(ref: String, callback: T)>) -> String {
private func append<T>(callback: T, to array: SynchronizedArray<(ref: String, callback: T)>) -> String {
let ref = makeRef()
array.append((ref, callback))
return ref
Expand Down Expand Up @@ -582,10 +582,10 @@ public class Socket: PhoenixTransportDelegate {
///
/// - Parameter refs: List of refs returned by calls to `onOpen`, `onClose`, etc
public func off(_ refs: [String]) {
self.stateChangeCallbacks.open = self.stateChangeCallbacks.open.filter({ !refs.contains($0.ref) })
self.stateChangeCallbacks.close = self.stateChangeCallbacks.close.filter({ !refs.contains($0.ref) })
self.stateChangeCallbacks.error = self.stateChangeCallbacks.error.filter({ !refs.contains($0.ref) })
self.stateChangeCallbacks.message = self.stateChangeCallbacks.message.filter({ !refs.contains($0.ref) })
self.stateChangeCallbacks.open.removeAll { refs.contains($0.ref) }
self.stateChangeCallbacks.close.removeAll { refs.contains($0.ref) }
self.stateChangeCallbacks.error.removeAll { refs.contains($0.ref) }
self.stateChangeCallbacks.message.removeAll { refs.contains($0.ref) }
}


Expand Down Expand Up @@ -730,14 +730,14 @@ public class Socket: PhoenixTransportDelegate {

/// Send all messages that were buffered before the socket opened
internal func flushSendBuffer() {
guard isConnected && sendBuffer.count > 0 else { return }
guard isConnected else { return }
self.sendBuffer.forEach( { try? $0.callback() } )
self.sendBuffer = []
self.sendBuffer.removeAll()
}

/// Removes an item from the sendBuffer with the matching ref
internal func removeFromSendBuffer(ref: String) {
self.sendBuffer = self.sendBuffer.filter({ $0.ref != ref })
self.sendBuffer.removeAll { $0.ref == ref }
}

/// Builds a fully qualified socket `URL` from `endPoint` and `params`.
Expand Down
19 changes: 5 additions & 14 deletions Sources/SwiftPhoenixClient/SynchronizedArray.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@ import Foundation
/// A thread-safe array.
public class SynchronizedArray<Element> {
fileprivate let queue = DispatchQueue(label: "spc_sync_array", attributes: .concurrent)
fileprivate var array = [Element]()

public init() { }

public convenience init(_ array: [Element]) {
self.init()
fileprivate var array: [Element]

public init(_ array: [Element] = []) {
self.array = array
}

Expand All @@ -26,15 +23,9 @@ public class SynchronizedArray<Element> {
}
}

func filter(_ isIncluded: @escaping (Element) -> Bool) -> SynchronizedArray {
var result: SynchronizedArray?
queue.sync { result = SynchronizedArray(self.array.filter(isIncluded)) }
return result!
}

func forEach(_ body: (Element) -> Void) {
queue.sync { self.array.forEach(body) }
}
queue.sync { self.array }.forEach(body)
}

func removeAll() {
queue.async(flags: .barrier) {
Expand Down

0 comments on commit edfb5e3

Please sign in to comment.