Skip to content

Commit

Permalink
Fix SerialDisposable, it affects for Observable<Observable>>.Switch, …
Browse files Browse the repository at this point in the history
…Enumerable<Observable>.Concat
  • Loading branch information
neuecc committed Mar 18, 2024
1 parent 7600200 commit 8482849
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 195 deletions.
194 changes: 12 additions & 182 deletions sandbox/ConsoleApp1/Program.cs
Original file line number Diff line number Diff line change
@@ -1,187 +1,17 @@
using Microsoft.Extensions.Time.Testing;
using R3;
using System.ComponentModel;
using System.ComponentModel.DataAnnotations;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Text.Json.Serialization.Metadata;
using System.Threading.Channels;
using R3;

var subject = new Subject<Observable<int>>();

var p1 = new ReactiveProperty<int>();
var disposable = subject.Switch().Subscribe();

Console.WriteLine(p1.HasObservers);
var d = p1.Skip(1).Subscribe(x => Debug.Log("[P1]" + x));
var observable1 = new Subject<int>();
var observable2 = new Subject<int>();
var observable3 = new Subject<int>();
var observable4 = new Subject<int>();

subject.OnNext(observable1.Do(onDispose: () => Console.WriteLine("Dispose 1")));
subject.OnNext(observable2.Do(onDispose: () => Console.WriteLine("Dispose 2")));
subject.OnNext(observable3.Do(onDispose: () => Console.WriteLine("Dispose 3")));
subject.OnNext(observable4.Do(onDispose: () => Console.WriteLine("Dispose 4")));


var d1 = p1.Skip(1).Subscribe(x => Debug.Log("[P2]" + x));
var d2 = p1.Skip(1).Subscribe(x => Debug.Log("[P2]" + x));
d1.Dispose();
d2.Dispose();

var d3 = p1.Skip(1).Subscribe(x => Debug.Log("[P3]" + x)); // P3 is not raised

Console.WriteLine(p1.HasObservers);

p1.Value = 1;
p1.Value = 2;

d.Dispose();

Console.WriteLine(p1.HasObservers);

d3.Dispose();

Console.WriteLine(p1.HasObservers);


public static class Debug
{
public static void Log(string x)
{
Console.WriteLine(x);
}
}

public class Person : INotifyPropertyChanged
{
public event PropertyChangedEventHandler? PropertyChanged;

string name = default!;

public required string Name
{
get
{
return name;
}
set
{
name = value;
PropertyChanged?.Invoke(this, new PropertyChangedEventArgs("Name"));
}
}
}


internal static class ChannelUtility
{
static readonly UnboundedChannelOptions options = new UnboundedChannelOptions
{
SingleWriter = true, // in Rx operator, OnNext gurantees synchronous
SingleReader = true, // almostly uses single reader loop
AllowSynchronousContinuations = true // if false, uses TaskCreationOptions.RunContinuationsAsynchronously so avoid it.
};

internal static Channel<T> CreateSingleReadeWriterUnbounded<T>()
{
return Channel.CreateUnbounded<T>(options);
}
}


public sealed class ClampedReactiveProperty<T>(T initialValue, T min, T max)
: ReactiveProperty<T>(initialValue) where T : IComparable<T>
{
private static IComparer<T> Comparer { get; } = Comparer<T>.Default;

protected override void OnValueChanging(ref T value)
{
if (Comparer.Compare(value, min) < 0)
{
value = min;
}
else if (Comparer.Compare(value, max) > 0)
{
value = max;
}
}
}


public sealed class ClampedReactiveProperty2<T>
: ReactiveProperty<T> where T : IComparable<T>
{
private static IComparer<T> Comparer { get; } = Comparer<T>.Default;

readonly T min, max;

// callOnValueChangeInBaseConstructor to avoid OnValueChanging call before min, max set.
public ClampedReactiveProperty2(T initialValue, T min, T max)
: base(initialValue, EqualityComparer<T>.Default, callOnValueChangeInBaseConstructor: false)
{
this.min = min;
this.max = max;

// modify currentValue manually
OnValueChanging(ref GetValueRef());
}

protected override void OnValueChanging(ref T value)
{
if (Comparer.Compare(value, min) < 0)
{
value = min;
}
else if (Comparer.Compare(value, max) > 0)
{
value = max;
}
}
}





//[JsonConverter(typeof(IgnoreCaseStringReactivePropertyJsonConverter))]
public class IgnoreCaseStringReactiveProperty : ReactiveProperty<string>
{
public IgnoreCaseStringReactiveProperty(string value)
: base(value, StringComparer.OrdinalIgnoreCase)
{

}
}

internal class IgnoreCaseStringReactivePropertyJsonConverter : ReactivePropertyJsonConverter<string>
{
protected override ReactiveProperty<string> CreateReactiveProperty(string value)
{
return new IgnoreCaseStringReactiveProperty(value);
}
}

public class MySyncContext : SynchronizationContext
{
public MySyncContext()
{
}

public override void Post(SendOrPostCallback d, object? state)
{
base.Post(d, state);
}

public override void Send(SendOrPostCallback d, object? state)
{
base.Send(d, state);
}
}


class NewsViewModel
{
ReactiveProperty<NewsUiState> _uiState = new(new NewsUiState());
public ReadOnlyReactiveProperty<NewsUiState> UiState => _uiState;
}

class NewsUiState
{
}

disposable.Dispose();
29 changes: 16 additions & 13 deletions src/R3/SerialDisposable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,25 @@ public IDisposable? Disposable
}
set
{
var field = Interlocked.CompareExchange(ref current, value, null);
if (field == null)
var field = Volatile.Read(ref current);
while (true)
{
// ok to set.
return;
}
if (field == DisposedSentinel.Instance)
{
// We've already been disposed, so dispose the value we've just been given.
value?.Dispose();
return;
}

if (field == DisposedSentinel.Instance)
{
// We've already been disposed, so dispose the value we've just been given.
value?.Dispose();
return;
}
var exchangedCurrent = Interlocked.CompareExchange(ref current, value, field);
if (exchangedCurrent == field)
{
exchangedCurrent?.Dispose();
return;
}

// otherwise, dispose previous disposable
field.Dispose();
field = exchangedCurrent;
}
}
}

Expand Down
32 changes: 32 additions & 0 deletions tests/R3.Tests/SerialDisposableTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
namespace R3.Tests;

public class SerialDisposableTest
{
[Fact]
public void Dispose()
{
var l = new List<int>();

var d = new SerialDisposableCore();
d.Disposable = Disposable.Create(() => l.Add(1));

l.Should().Equal([]);

d.Disposable = Disposable.Create(() => l.Add(2));
l.Should().Equal([1]);

d.Disposable = Disposable.Create(() => l.Add(3));
l.Should().Equal([1, 2]);

d.Disposable = Disposable.Create(() => l.Add(4));
l.Should().Equal([1, 2, 3]);

d.Dispose();

l.Should().Equal([1, 2, 3, 4]);

d.Disposable = Disposable.Create(() => l.Add(5));

l.Should().Equal([1, 2, 3, 4, 5]);
}
}

0 comments on commit 8482849

Please sign in to comment.