Skip to content

Commit

Permalink
Merge pull request #149 from Cysharp/light-rp
Browse files Browse the repository at this point in the history
Reduce memory usage of ReactiveProperty
  • Loading branch information
neuecc authored Feb 29, 2024
2 parents e823bde + a6f8474 commit b7f79da
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 25 deletions.
91 changes: 66 additions & 25 deletions src/R3/ReactiveProperty.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ protected virtual void OnReceiveError(Exception exception) { }
public abstract void Dispose();
}

// almostly same code as Subject<T>.

// allow inherit

#if NET6_0_OR_GREATER
Expand All @@ -26,7 +24,7 @@ public class ReactiveProperty<T> : ReadOnlyReactiveProperty<T>, ISubject<T>
{
T currentValue;
IEqualityComparer<T>? equalityComparer;
FreeListCore<Subscription> list; // struct(array, int)
ObserverNode? root; // Root of LinkedList Node(Root.Previous is Last)
CompleteState completeState; // struct(int, IntPtr)

public IEqualityComparer<T>? EqualityComparer => equalityComparer;
Expand Down Expand Up @@ -69,8 +67,6 @@ public ReactiveProperty(T value)
public ReactiveProperty(T value, IEqualityComparer<T>? equalityComparer)
{
this.equalityComparer = equalityComparer;
this.list = new FreeListCore<Subscription>(this); // use self as gate(reduce memory usage), this is slightly dangerous so don't lock this in user.

OnValueChanging(ref value);
this.currentValue = value;
OnValueChanged(value);
Expand All @@ -79,7 +75,6 @@ public ReactiveProperty(T value, IEqualityComparer<T>? equalityComparer)
protected ReactiveProperty(T value, IEqualityComparer<T>? equalityComparer, bool callOnValueChangeInBaseConstructor)
{
this.equalityComparer = equalityComparer;
this.list = new FreeListCore<Subscription>(this);

if (callOnValueChangeInBaseConstructor)
{
Expand Down Expand Up @@ -115,9 +110,11 @@ void OnNextCore(T value)
{
if (completeState.IsCompleted) return;

foreach (var subscription in list.AsSpan())
var node = Volatile.Read(ref root);
while (node != null)
{
subscription?.observer.OnNext(value);
node.Observer.OnNext(value);
node = node.Next;
}
}

Expand All @@ -127,9 +124,11 @@ public void OnErrorResume(Exception error)

OnReceiveError(error);

foreach (var subscription in list.AsSpan())
var node = Volatile.Read(ref root);
while (node != null)
{
subscription?.observer.OnErrorResume(error);
node.Observer.OnErrorResume(error);
node = node.Next;
}
}

Expand All @@ -146,9 +145,11 @@ public void OnCompleted(Result result)
OnReceiveError(result.Exception);
}

foreach (var subscription in list.AsSpan())
var node = Volatile.Read(ref root);
while (node != null)
{
subscription?.observer.OnCompleted(result);
node.Observer.OnCompleted(result);
node = node.Next;
}
}

Expand All @@ -165,13 +166,13 @@ protected override IDisposable SubscribeCore(Observer<T> observer)
// raise latest value on subscribe(before add observer to list)
observer.OnNext(currentValue);

var subscription = new Subscription(this, observer); // create subscription and add observer to list.
var subscription = new ObserverNode(this, observer); // create subscription

// need to check called completed during adding
result = completeState.TryGetResult();
if (result != null)
{
subscription.observer.OnCompleted(result.Value);
subscription.Observer.OnCompleted(result.Value);
subscription.Dispose();
return Disposable.Empty;
}
Expand All @@ -191,14 +192,16 @@ public void Dispose(bool callOnCompleted)
if (callOnCompleted && !alreadyCompleted)
{
// not yet disposed so can call list iteration
foreach (var subscription in list.AsSpan())
var node = Volatile.Read(ref root);
while (node != null)
{
subscription?.observer.OnCompleted();
node.Observer.OnCompleted();
node = node.Next;
}
}

DisposeCore();
list.Dispose();
Volatile.Write(ref root, null);
}
}

Expand All @@ -209,26 +212,64 @@ protected virtual void DisposeCore() { }
return (currentValue == null) ? "(null)" : currentValue.ToString();
}

sealed class Subscription : IDisposable
sealed class ObserverNode : IDisposable
{
public readonly Observer<T> observer;
readonly int removeKey;
public readonly Observer<T> Observer;

ReactiveProperty<T>? parent;

public Subscription(ReactiveProperty<T> parent, Observer<T> observer)
public ObserverNode Previous { get; set; }
public ObserverNode? Next { get; set; }

public ObserverNode(ReactiveProperty<T> parent, Observer<T> observer)
{
this.parent = parent;
this.observer = observer;
parent.list.Add(this, out removeKey); // for the thread-safety, add and set removeKey in same lock.
this.Observer = observer;

if (parent.root == null)
{
Volatile.Write(ref parent.root, this);
this.Previous = this;
}
else
{
var last = parent.root.Previous;
last.Next = this;
this.Previous = last;
parent.root.Previous = this; // this as last
}
}

public void Dispose()
{
var p = Interlocked.Exchange(ref parent, null);
if (p == null) return;

// removeKey is index, will reuse if remove completed so only allows to call from here and must not call twice.
p.list.Remove(removeKey);
if (this.Previous == this) // single list
{
p.root = null;
return;
}

if (p.root == this)
{
var next = this.Next;
p.root = next;
if (next != null)
{
next.Previous = this.Previous;
}
}
else
{
var prev = this.Previous;
var next = this.Next;
prev.Next = next;
if (next != null)
{
next.Previous = prev;
}
}
}
}
}
Expand Down
57 changes: 57 additions & 0 deletions tests/R3.Tests/ReactivePropertyTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,61 @@ public void SubscribeAfterCompleted()
list.AssertIsCompleted();
list.AssertEqual(["foo"]);
}

// Node Check
[Fact]
public void CheckNode()
{
var rp = new ReactiveProperty<int>();


var list1 = rp.ToLiveList();

rp.Value = 1;

list1.AssertEqual([0, 1]);

list1.Dispose();

var list2 = rp.ToLiveList();

rp.Value = 2;

list2.AssertEqual([1, 2]);

var list3 = rp.ToLiveList();
var list4 = rp.ToLiveList();

// remove first

list2.Dispose();
rp.Value = 3;

list3.AssertEqual([2, 3]);
list4.AssertEqual([2, 3]);

var list5 = rp.ToLiveList();

// remove middle
list4.Dispose();

rp.Value = 4;

list3.AssertEqual([2, 3, 4]);
list5.AssertEqual([3, 4]);

// remove last
list5.Dispose();

rp.Value = 5;
list3.AssertEqual([2, 3, 4, 5]);

// remove single
list3.Dispose();

// subscribe once
var list6 = rp.ToLiveList();
rp.Value = 6;
list6.AssertEqual([5, 6]);
}
}

0 comments on commit b7f79da

Please sign in to comment.