Skip to content

Commit

Permalink
tech(observer): Use registry to observe arrays (#57)
Browse files Browse the repository at this point in the history
  • Loading branch information
pjechris authored Sep 4, 2023
1 parent cacb619 commit d2ab63e
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 77 deletions.
11 changes: 5 additions & 6 deletions Sources/CohesionKit/Identity/IdentityStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ public class IdentityMap {
private lazy var storeVisitor = IdentityMapStoreVisitor(identityMap: self)

/// Create a new IdentityMap instance optionally with a queue and a logger
/// - Parameter queue: the queue on which to receive updates. If nil identitymap will create its own. DO NOT USE
/// main thread as you may end up with data races
/// - Parameter queue: the queue on which to receive updates. If nil identitymap will create its own.
/// - Parameter logger: a logger to follow/debug identity internal state
public convenience init(queue: DispatchQueue? = nil, logger: Logger? = nil) {
self.init(registry: ObserverRegistry(queue: queue), logger: logger)
Expand Down Expand Up @@ -92,7 +91,7 @@ public class IdentityMap {

/// Store multiple entities at once
public func store<C: Collection>(entities: C, named: AliasKey<C>? = nil, modifiedAt: Stamp? = nil)
-> [EntityObserver<C.Element>] where C.Element: Identifiable {
-> EntityObserver<[C.Element]> where C.Element: Identifiable {
transaction {
let nodes = entities.map { nodeStore(entity: $0, modifiedAt: modifiedAt) }

Expand All @@ -101,13 +100,13 @@ public class IdentityMap {
logger?.didRegisterAlias(alias)
}

return nodes.map { EntityObserver(node: $0, registry: registry) }
return EntityObserver(nodes: nodes, registry: registry)
}
}

/// store multiple aggregates at once
public func store<C: Collection>(entities: C, named: AliasKey<C>? = nil, modifiedAt: Stamp? = nil)
-> [EntityObserver<C.Element>] where C.Element: Aggregate {
-> EntityObserver<[C.Element]> where C.Element: Aggregate {
transaction {
let nodes = entities.map { nodeStore(entity: $0, modifiedAt: modifiedAt) }

Expand All @@ -116,7 +115,7 @@ public class IdentityMap {
logger?.didRegisterAlias(alias)
}

return nodes.map { EntityObserver(node: $0, registry: registry) }
return EntityObserver(nodes: nodes, registry: registry)
}
}

Expand Down
17 changes: 7 additions & 10 deletions Sources/CohesionKit/Observer/AliasObserver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,15 @@ extension AliasObserver {
registry: ObserverRegistry,
onChange: @escaping OnChangeClosure
) -> Subscription {
// register for current alias value
var entityChangesSubscription: Subscription? = alias
.value
.map { node in registry.addObserver(node: node, onChange: onChange) }
.map { node in registry.addObserver(node: node, initial: true, onChange: onChange) }

// subscribe to alias changes
let subscription = alias.addObserver { node in
// update entity changes subscription
entityChangesSubscription = node.map { registry.addObserver(node: $0) { onChange($0) }}

registry.queue.async { onChange(node?.ref.value) }
entityChangesSubscription = node.map { registry.addObserver(node: $0, initial: true, onChange: onChange) }
}

return Subscription {
Expand All @@ -64,16 +63,15 @@ extension AliasObserver {
registry: ObserverRegistry,
onChange: @escaping OnChangeClosure
) -> Subscription where T == Array<E> {
// register for current alias value
var entitiesChangesSubscriptions: Subscription? = alias
.value
.map { nodes in nodes.map { EntityObserver(node: $0, registry: registry) } }?
.map { nodes in EntityObserver(nodes: nodes, registry: registry) }?
.observe(onChange: onChange)

// Subscribe to alias ref changes and to any changes made on the ref collection nodes.
let subscription = alias.addObserver { nodes in
let nodeObservers = nodes?.map { EntityObserver(node: $0, registry: registry) }

registry.queue.async { onChange(nodeObservers?.value) }
let nodeObservers = nodes.map { EntityObserver(nodes: $0, registry: registry) }

// update collection changes subscription
entitiesChangesSubscriptions = nodeObservers?.observe(onChange: onChange)
Expand All @@ -83,6 +81,5 @@ extension AliasObserver {
subscription.unsubscribe()
entitiesChangesSubscriptions?.unsubscribe()
}
}

}
}
24 changes: 17 additions & 7 deletions Sources/CohesionKit/Observer/EntityObserver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,27 @@ import Foundation

/// A type registering observers on a given entity from identity storage
public struct EntityObserver<T>: Observer {
let node: EntityNode<T>
let registry: ObserverRegistry
public typealias OnChange = (T) -> Void

public let value: T

let createObserver: (@escaping OnChange) -> Subscription

init(node: EntityNode<T>, registry: ObserverRegistry) {
self.registry = registry
self.node = node
self.value = node.value as! T
self.value = node.ref.value
self.createObserver = { onChange in
registry.addObserver(node: node, initial: true, onChange: onChange)
}
}

init<Element>(nodes: [EntityNode<Element>], registry: ObserverRegistry) where T == [Element] {
self.value = nodes.map(\.ref.value)
self.createObserver = { onChange in
registry.addObserver(nodes: nodes, initial: true, onChange: onChange)
}
}

public func observe(onChange: @escaping (T) -> Void) -> Subscription {
registry.addObserver(node: node, initial: true, onChange: onChange)
public func observe(onChange: @escaping OnChange) -> Subscription {
createObserver(onChange)
}
}
23 changes: 2 additions & 21 deletions Sources/CohesionKit/Observer/Observer.swift
Original file line number Diff line number Diff line change
@@ -1,34 +1,15 @@
/// A protocol allowing to observe a value returned from the `IdentityMap`
public protocol Observer {
associatedtype T

/// The value at the time the observer creation.
/// If you want **realtime** value use `observe to get notified of changes
var value: T { get }

/// Add an observer being notified when entity change.
/// Alternatively you can use `asPublisher` to observe using Combine.
/// - Parameter onChange: a closure called when value changed
/// - Returns: a subscription to cancel observation. Observation is automatically cancelled if subscription is deinit.
/// As long as the subscription is alived the entity should be kept in `IdentityMap`.
func observe(onChange: @escaping (T) -> Void) -> Subscription
}

extension Array: Observer where Element: Observer {
public var value: [Element.T] { map(\.value) }

public func observe(onChange: @escaping ([Element.T]) -> Void) -> Subscription {
var value = value

let subscriptions = indices.map { index in
self[index].observe {
value[index] = $0
onChange(value)
}
}

return Subscription {
subscriptions.forEach { $0.unsubscribe() }
}
}
}
100 changes: 76 additions & 24 deletions Sources/CohesionKit/Observer/ObserverRegistry.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,60 @@ import Foundation
/// Registers observers associated to an ``EntityNode``.
/// The registry will handle notifying observers when a node is marked as changed
class ObserverRegistry {
typealias Observer = (Any) -> Void
private typealias ObserverID = Int
private typealias Hash = Int

let queue: DispatchQueue
/// registered observers
private var observers: [Hash: [ObserverID: Observer]] = [:]
/// next available id for an observer
private var nextObserverID: ObserverID = 0
/// registered observer handlers
private var handlers: [Hash: Set<Handler>] = [:]
/// nodes waiting for notifiying their observes about changes
private var pendingChanges: [Hash: AnyWeak] = [:]

init(queue: DispatchQueue? = nil) {
self.queue = queue ?? DispatchQueue(label: "com.cohesionkit.registry")
self.queue = queue ?? DispatchQueue.main
}

/// register an observer to observe changes on an entity node. Everytime `ObserverRegistry` is notified about changes
/// to this node `onChange` will be called.
func addObserver<T>(node: EntityNode<T>, initial: Bool = false, onChange: @escaping (T) -> Void) -> Subscription {
let observerID = generateID()
let handler = Handler { onChange($0.ref.value) }

observers[node.hashValue, default: [:]][observerID] = {
guard let newValue = $0 as? EntityNode<T> else {
return
if initial {
if queue == DispatchQueue.main && Thread.isMainThread {
onChange(node.ref.value)
}
else {
queue.sync {
onChange(node.ref.value)
}
}
}

onChange(newValue.ref.value)
return subscribeHandler(handler, for: node)
}

/// Add an observer handler to multiple nodes.
/// Note that the same handler will be added to each nodes. But it should get notified only once per transaction
func addObserver<T>(nodes: [EntityNode<T>], initial: Bool = false, onChange: @escaping ([T]) -> Void) -> Subscription {
let handler = Handler { (_: EntityNode<T>) in
// use last value from nodes
onChange(nodes.map(\.ref.value))
}

if initial {
if queue == DispatchQueue.main && Thread.isMainThread {
onChange(node.ref.value)
onChange(nodes.map(\.ref.value))
}
else {
queue.sync {
onChange(node.ref.value)
onChange(nodes.map(\.ref.value))
}
}
}

// subscription keeps a strong ref to node, avoiding it from being released somehow while suscription is running
return Subscription { [node] in
self.observers[node.hashValue]?.removeValue(forKey: observerID)
let subscriptions = nodes.map { node in subscribeHandler(handler, for: node) }

return Subscription {
subscriptions.forEach { $0.unsubscribe() }
}
}

Expand All @@ -61,26 +72,67 @@ class ObserverRegistry {
/// Notify observers of all queued changes. Once notified pending changes are cleared out.
func postChanges() {
let changes = pendingChanges
let observers = self.observers
let handlers = self.handlers
var executedHandlers: Set<Handler> = []

self.pendingChanges = [:]

queue.async { [unowned self] in
queue.async {
for (hashKey, weakNode) in changes {
// node was released: no one to notify
guard let node = weakNode.unwrap() else {
continue
}

observers[hashKey]?.forEach { (_, observer) in
observer(node)
for handler in handlers[hashKey] ?? [] {
// if some handlers are used multiple times (like for collections), make sure we don't notify them multiple times
guard !executedHandlers.contains(handler) else {
continue
}

handler(node)
executedHandlers.insert(handler)
}
}
}
}

private func generateID() -> ObserverID {
defer { nextObserverID &+= 1 }
return nextObserverID
private func subscribeHandler<T>(_ handler: Handler, for node: EntityNode<T>) -> Subscription {
handlers[node.hashValue, default: []].insert(handler)

// subscription keeps a strong ref to node, avoiding it from being released somehow while suscription is running
return Subscription { [node] in
self.handlers[node.hashValue]?.remove(handler)
}
}
}

extension ObserverRegistry {
/// Handle observation for a given node
class Handler: Hashable {
let executor: (Any) -> Void

init<T>(executor: @escaping (EntityNode<T>) -> Void) {
self.executor = {
guard let entity = $0 as? EntityNode<T> else {
return
}

executor(entity)
}
}

/// execute the handler if `executeAtMost` does not exceed `executeCount`
func callAsFunction(_ value: Any) {
executor(value)
}

static func == (lhs: Handler, rhs: Handler) -> Bool {
ObjectIdentifier(lhs) == ObjectIdentifier(rhs)
}

func hash(into hasher: inout Hasher) {
hasher.combine(ObjectIdentifier(self))
}
}
}
14 changes: 13 additions & 1 deletion Tests/CohesionKitTests/IdentityMapTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,16 @@ extension IdentityMapTests {
let identityMap = IdentityMap(queue: .main)
let newEntity = SingleNodeFixture(id: 2)
let expectation = XCTestExpectation()
var firstDropped = false

_ = identityMap.store(entity: SingleNodeFixture(id: 1), named: .test, modifiedAt: 0)

let subscription = identityMap.find(named: .test).observe {
guard firstDropped else {
firstDropped = true
return
}

expectation.fulfill()
XCTAssertEqual($0, newEntity)
}
Expand All @@ -289,10 +295,16 @@ extension IdentityMapTests {
let identityMap = IdentityMap(queue: .main)
let entities = [SingleNodeFixture(id: 1)]
let expectation = XCTestExpectation()
var firstDropped = false

_ = identityMap.store(entities: [], named: .listOfNodes, modifiedAt: 0)

let subscription = identityMap.find(named: .listOfNodes).observe {
guard firstDropped else {
firstDropped = true
return
}

expectation.fulfill()
XCTAssertEqual($0, entities)
}
Expand All @@ -313,4 +325,4 @@ private extension AliasKey where T == SingleNodeFixture {

private extension AliasKey where T == [SingleNodeFixture] {
static let listOfNodes = AliasKey(named: "listOfNodes")
}
}
Loading

0 comments on commit d2ab63e

Please sign in to comment.