diff --git a/src/R3/ReactiveProperty.cs b/src/R3/ReactiveProperty.cs index c8b9c8f4..9af158b4 100644 --- a/src/R3/ReactiveProperty.cs +++ b/src/R3/ReactiveProperty.cs @@ -15,8 +15,6 @@ protected virtual void OnReceiveError(Exception exception) { } public abstract void Dispose(); } -// almostly same code as Subject. - // allow inherit #if NET6_0_OR_GREATER @@ -26,7 +24,7 @@ public class ReactiveProperty : ReadOnlyReactiveProperty, ISubject { T currentValue; IEqualityComparer? equalityComparer; - FreeListCore list; // struct(array, int) + ObserverNode? root; // Root of LinkedList Node(Root.Previous is Last) CompleteState completeState; // struct(int, IntPtr) public IEqualityComparer? EqualityComparer => equalityComparer; @@ -69,8 +67,6 @@ public ReactiveProperty(T value) public ReactiveProperty(T value, IEqualityComparer? equalityComparer) { this.equalityComparer = equalityComparer; - this.list = new FreeListCore(this); // use self as gate(reduce memory usage), this is slightly dangerous so don't lock this in user. - OnValueChanging(ref value); this.currentValue = value; OnValueChanged(value); @@ -79,7 +75,6 @@ public ReactiveProperty(T value, IEqualityComparer? equalityComparer) protected ReactiveProperty(T value, IEqualityComparer? equalityComparer, bool callOnValueChangeInBaseConstructor) { this.equalityComparer = equalityComparer; - this.list = new FreeListCore(this); if (callOnValueChangeInBaseConstructor) { @@ -115,9 +110,11 @@ void OnNextCore(T value) { if (completeState.IsCompleted) return; - foreach (var subscription in list.AsSpan()) + var node = Volatile.Read(ref root); + while (node != null) { - subscription?.observer.OnNext(value); + node.Observer.OnNext(value); + node = node.Next; } } @@ -127,9 +124,11 @@ public void OnErrorResume(Exception error) OnReceiveError(error); - foreach (var subscription in list.AsSpan()) + var node = Volatile.Read(ref root); + while (node != null) { - subscription?.observer.OnErrorResume(error); + node.Observer.OnErrorResume(error); + node = node.Next; } } @@ -146,9 +145,11 @@ public void OnCompleted(Result result) OnReceiveError(result.Exception); } - foreach (var subscription in list.AsSpan()) + var node = Volatile.Read(ref root); + while (node != null) { - subscription?.observer.OnCompleted(result); + node.Observer.OnCompleted(result); + node = node.Next; } } @@ -165,13 +166,13 @@ protected override IDisposable SubscribeCore(Observer observer) // raise latest value on subscribe(before add observer to list) observer.OnNext(currentValue); - var subscription = new Subscription(this, observer); // create subscription and add observer to list. + var subscription = new ObserverNode(this, observer); // create subscription // need to check called completed during adding result = completeState.TryGetResult(); if (result != null) { - subscription.observer.OnCompleted(result.Value); + subscription.Observer.OnCompleted(result.Value); subscription.Dispose(); return Disposable.Empty; } @@ -191,14 +192,16 @@ public void Dispose(bool callOnCompleted) if (callOnCompleted && !alreadyCompleted) { // not yet disposed so can call list iteration - foreach (var subscription in list.AsSpan()) + var node = Volatile.Read(ref root); + while (node != null) { - subscription?.observer.OnCompleted(); + node.Observer.OnCompleted(); + node = node.Next; } } DisposeCore(); - list.Dispose(); + Volatile.Write(ref root, null); } } @@ -209,17 +212,32 @@ protected virtual void DisposeCore() { } return (currentValue == null) ? "(null)" : currentValue.ToString(); } - sealed class Subscription : IDisposable + sealed class ObserverNode : IDisposable { - public readonly Observer observer; - readonly int removeKey; + public readonly Observer Observer; + ReactiveProperty? parent; - public Subscription(ReactiveProperty parent, Observer observer) + public ObserverNode Previous { get; set; } + public ObserverNode? Next { get; set; } + + public ObserverNode(ReactiveProperty parent, Observer observer) { this.parent = parent; - this.observer = observer; - parent.list.Add(this, out removeKey); // for the thread-safety, add and set removeKey in same lock. + this.Observer = observer; + + if (parent.root == null) + { + Volatile.Write(ref parent.root, this); + this.Previous = this; + } + else + { + var last = parent.root.Previous; + last.Next = this; + this.Previous = last; + parent.root.Previous = this; // this as last + } } public void Dispose() @@ -227,8 +245,31 @@ public void Dispose() var p = Interlocked.Exchange(ref parent, null); if (p == null) return; - // removeKey is index, will reuse if remove completed so only allows to call from here and must not call twice. - p.list.Remove(removeKey); + if (this.Previous == this) // single list + { + p.root = null; + return; + } + + if (p.root == this) + { + var next = this.Next; + p.root = next; + if (next != null) + { + next.Previous = this.Previous; + } + } + else + { + var prev = this.Previous; + var next = this.Next; + prev.Next = next; + if (next != null) + { + next.Previous = prev; + } + } } } } diff --git a/tests/R3.Tests/ReactivePropertyTest.cs b/tests/R3.Tests/ReactivePropertyTest.cs index 0846105f..92c29bfb 100644 --- a/tests/R3.Tests/ReactivePropertyTest.cs +++ b/tests/R3.Tests/ReactivePropertyTest.cs @@ -49,4 +49,61 @@ public void SubscribeAfterCompleted() list.AssertIsCompleted(); list.AssertEqual(["foo"]); } + + // Node Check + [Fact] + public void CheckNode() + { + var rp = new ReactiveProperty(); + + + var list1 = rp.ToLiveList(); + + rp.Value = 1; + + list1.AssertEqual([0, 1]); + + list1.Dispose(); + + var list2 = rp.ToLiveList(); + + rp.Value = 2; + + list2.AssertEqual([1, 2]); + + var list3 = rp.ToLiveList(); + var list4 = rp.ToLiveList(); + + // remove first + + list2.Dispose(); + rp.Value = 3; + + list3.AssertEqual([2, 3]); + list4.AssertEqual([2, 3]); + + var list5 = rp.ToLiveList(); + + // remove middle + list4.Dispose(); + + rp.Value = 4; + + list3.AssertEqual([2, 3, 4]); + list5.AssertEqual([3, 4]); + + // remove last + list5.Dispose(); + + rp.Value = 5; + list3.AssertEqual([2, 3, 4, 5]); + + // remove single + list3.Dispose(); + + // subscribe once + var list6 = rp.ToLiveList(); + rp.Value = 6; + list6.AssertEqual([5, 6]); + } }