Skip to content

Commit

Permalink
Fixed #226
Browse files Browse the repository at this point in the history
  • Loading branch information
sakno committed Mar 4, 2024
1 parent 808e2ed commit bbe2617
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 36 deletions.
15 changes: 5 additions & 10 deletions src/DotNext/Func.cs
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,16 @@ public static Func<T> Constant<T>(T obj)
if (typeof(T) == typeof(bool))
return Unsafe.As<Func<T>>(Constant(Unsafe.As<T, bool>(ref obj)));

// cache nulls
if (obj is null)
return Default!;

// slow path - allocates a new delegate
unsafe
{
return DelegateHelpers.CreateDelegate<object?, T>(&ConstantCore, obj);
}

static T ConstantCore(object? obj) => (T)obj!;
return obj is null
? Default!
: obj.ConstantCore<T>;

static T? Default() => default;
}

private static T ConstantCore<T>(this object obj) => (T)obj;

private static Func<bool> Constant(bool value)
{
return value ? True : False;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ HttpMessageHandler IHostingContext.CreateHttpHandler()
public override async Task StartAsync(CancellationToken token)
{
configurator?.OnStart(this, metadata);
ConfigurationStorage.ActiveConfigurationChanged += GetConfigurationEventHandler(configurationEvents.Writer);
ConfigurationStorage.ActiveConfigurationChanged += configurationEvents.Writer.WriteConfigurationEvent;

if (coldStart)
{
Expand Down Expand Up @@ -193,7 +193,7 @@ async Task StopAsync()
{
configurator?.OnStop(this);
duplicationDetector.Trim(100);
ConfigurationStorage.ActiveConfigurationChanged -= GetConfigurationEventHandler(configurationEvents.Writer);
ConfigurationStorage.ActiveConfigurationChanged -= configurationEvents.Writer.WriteConfigurationEvent;
configurationEvents.Writer.TryComplete();
await pollingLoopTask.ConfigureAwait(false);
pollingLoopTask = Task.CompletedTask;
Expand All @@ -205,17 +205,6 @@ async Task StopAsync()
}
}

private static Func<UriEndPoint, bool, CancellationToken, ValueTask> GetConfigurationEventHandler(ChannelWriter<(UriEndPoint, bool)> writer)
{
unsafe
{
return DelegateHelpers.CreateDelegate<ChannelWriter<(UriEndPoint, bool)>, UriEndPoint, bool, CancellationToken, ValueTask>(&WriteConfigurationEvent, writer);
}

static ValueTask WriteConfigurationEvent(ChannelWriter<(UriEndPoint, bool)> writer, UriEndPoint address, bool isAdded, CancellationToken token)
=> writer.WriteAsync(new(address, isAdded), token);
}

/// <inheritdoc />
ISubscriber? IPeerMesh<ISubscriber>.TryGetPeer(EndPoint peer)
{
Expand Down Expand Up @@ -243,4 +232,10 @@ protected override void Dispose(bool disposing)

base.Dispose(disposing);
}
}

file static class RaftHttpClusterHelpers
{
internal static ValueTask WriteConfigurationEvent(this ChannelWriter<(UriEndPoint, bool)> writer, UriEndPoint address, bool isAdded, CancellationToken token)
=> writer.WriteAsync(new(address, isAdded), token);
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public RaftCluster(NodeConfiguration configuration)
/// <returns>The task representing asynchronous execution of the method.</returns>
public override async Task StartAsync(CancellationToken token = default)
{
ConfigurationStorage.ActiveConfigurationChanged += GetConfigurationEventHandler(configurationEvents.Writer);
ConfigurationStorage.ActiveConfigurationChanged += configurationEvents.Writer.WriteConfigurationEvent;

if (coldStart)
{
Expand Down Expand Up @@ -151,17 +151,6 @@ public override async Task StartAsync(CancellationToken token = default)
await announcer(LocalMemberAddress, metadata, token).ConfigureAwait(false);
}

private static Func<EndPoint, bool, CancellationToken, ValueTask> GetConfigurationEventHandler(ChannelWriter<(EndPoint, bool)> writer)
{
unsafe
{
return DelegateHelpers.CreateDelegate<ChannelWriter<(EndPoint, bool)>, EndPoint, bool, CancellationToken, ValueTask>(&WriteConfigurationEvent, writer);
}

static ValueTask WriteConfigurationEvent(ChannelWriter<(EndPoint, bool)> writer, EndPoint address, bool isAdded, CancellationToken token)
=> writer.WriteAsync(new(address, isAdded), token);
}

/// <inheritdoc />
protected override ValueTask<bool> DetectLocalMemberAsync(RaftClusterMember candidate, CancellationToken token)
=> new(EndPointComparer.Equals(LocalMemberAddress, candidate.EndPoint));
Expand All @@ -181,7 +170,7 @@ async Task StopAsync()
{
await (server?.DisposeAsync() ?? ValueTask.CompletedTask).ConfigureAwait(false);
server = null;
ConfigurationStorage.ActiveConfigurationChanged -= GetConfigurationEventHandler(configurationEvents.Writer);
ConfigurationStorage.ActiveConfigurationChanged -= configurationEvents.Writer.WriteConfigurationEvent;
configurationEvents.Writer.TryComplete();
await pollingLoopTask.ConfigureAwait(false);
pollingLoopTask = Task.CompletedTask;
Expand Down Expand Up @@ -355,4 +344,10 @@ protected override void Dispose(bool disposing)

base.Dispose(disposing);
}
}

file static class RaftClusterHelpers
{
internal static ValueTask WriteConfigurationEvent(this ChannelWriter<(EndPoint, bool)> writer, EndPoint address, bool isAdded, CancellationToken token)
=> writer.WriteAsync(new(address, isAdded), token);
}

0 comments on commit bbe2617

Please sign in to comment.