-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #73 from argon-chat/feature/lockdown_and_fixes
Lockdown feature + EF Optimization+ Fixes
- Loading branch information
Showing
24 changed files
with
860 additions
and
203 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
46 changes: 46 additions & 0 deletions
46
src/Argon.Api/Features/Orleanse/Streams/ClientArgonStream.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
namespace Argon.Api.Features.Rpc; | ||
|
||
using Orleans.Streams; | ||
using System.Threading.Channels; | ||
using Contracts; | ||
|
||
public sealed class ClientArgonStream<T> : IArgonStream<T> where T : IArgonEvent | ||
{ | ||
private StreamSubscriptionHandle<T> clientHandler { get; set; } | ||
private Channel<T> channel { get; } = Channel.CreateUnbounded<T>(); | ||
public async Task OnNextAsync(T item, StreamSequenceToken? token = null) | ||
=> await channel.Writer.WriteAsync(item); | ||
|
||
public Task OnCompletedAsync() | ||
{ | ||
channel.Writer.Complete(); | ||
return Task.CompletedTask; | ||
} | ||
public Task OnErrorAsync(Exception ex) | ||
{ | ||
channel.Writer.Complete(ex); | ||
return Task.CompletedTask; | ||
} | ||
|
||
public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken ct = default) | ||
{ | ||
while (await channel.Reader.WaitToReadAsync(ct)) | ||
{ | ||
while (channel.Reader.TryRead(out var serverEvent)) | ||
yield return serverEvent; | ||
} | ||
} | ||
|
||
public async ValueTask DisposeAsync() | ||
=> await clientHandler.UnsubscribeAsync(); | ||
|
||
|
||
internal async ValueTask<IArgonStream<T>> BindClient(IAsyncStream<T> stream) | ||
{ | ||
clientHandler = await stream.SubscribeAsync(this); | ||
return this; | ||
} | ||
|
||
public ValueTask Fire(T ev) | ||
=> throw new NotImplementedException($"Client stream cannot be fire event"); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
namespace Argon.Api.Features.Rpc; | ||
|
||
using ActualLab.Rpc; | ||
using Contracts; | ||
using Orleans.Streams; | ||
|
||
public interface IArgonStream<T> : | ||
IAsyncObserver<T>, IAsyncEnumerable<T>, IAsyncDisposable where T : IArgonEvent | ||
{ | ||
public RpcStream<T> AsRpcStream() => new(this); | ||
ValueTask Fire(T ev); | ||
} |
39 changes: 39 additions & 0 deletions
39
src/Argon.Api/Features/Orleanse/Streams/IStreamExtension.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
namespace Argon.Api.Features.Rpc; | ||
|
||
using Contracts; | ||
|
||
public interface IStreamExtension | ||
{ | ||
ValueTask<IArgonStream<IArgonEvent>> CreateClientStream(Guid primary); | ||
} | ||
|
||
public interface IStreamExtension<T> where T : Grain, IGrainWithGuidKey | ||
{ | ||
ValueTask<IArgonStream<IArgonEvent>> CreateServerStream(); | ||
ValueTask<IArgonStream<IArgonEvent>> CreateServerStreamFor(Guid targetId); | ||
} | ||
public static class ArgonStreamExtensions | ||
{ | ||
public static IStreamExtension<T> Streams<T>(this T grain) where T : Grain, IGrainWithGuidKey | ||
=> new StreamForGrainExtension<T>(grain); | ||
|
||
public static IStreamExtension Streams(this IClusterClient clusterClient) | ||
=> new StreamForClusterClientExtension(clusterClient); | ||
} | ||
public readonly struct StreamForGrainExtension<T>(T grain) : IStreamExtension<T> where T : Grain, IGrainWithGuidKey | ||
{ | ||
public ValueTask<IArgonStream<IArgonEvent>> CreateServerStream() | ||
=> CreateServerStreamFor(grain.GetPrimaryKey()); | ||
|
||
public async ValueTask<IArgonStream<IArgonEvent>> CreateServerStreamFor(Guid targetId) | ||
=> new ServerArgonStream<IArgonEvent>(grain.GetStreamProvider(IArgonEvent.ProviderId) | ||
.GetStream<IArgonEvent>(StreamId.Create(IArgonEvent.Namespace, targetId))); | ||
} | ||
|
||
public readonly struct StreamForClusterClientExtension(IClusterClient? client) : IStreamExtension | ||
{ | ||
public ValueTask<IArgonStream<IArgonEvent>> CreateClientStream(Guid primary) | ||
=> new ClientArgonStream<IArgonEvent>().BindClient(client | ||
.GetStreamProvider(IArgonEvent.ProviderId) | ||
.GetStream<IArgonEvent>(StreamId.Create(IArgonEvent.Namespace, primary))); | ||
} |
23 changes: 23 additions & 0 deletions
23
src/Argon.Api/Features/Orleanse/Streams/ServerArgonStream.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
namespace Argon.Api.Features.Rpc; | ||
|
||
using Contracts; | ||
using Orleans.Streams; | ||
|
||
public sealed class ServerArgonStream<T>(IAsyncStream<IArgonEvent> stream) : IArgonStream<T> where T : IArgonEvent | ||
{ | ||
public Task OnNextAsync(T item, StreamSequenceToken? token = null) | ||
=> stream.OnNextAsync(item, token); | ||
|
||
public Task OnCompletedAsync() | ||
=> stream.OnCompletedAsync(); | ||
public Task OnErrorAsync(Exception ex) | ||
=> stream.OnErrorAsync(ex); | ||
|
||
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken ct = default) | ||
=> throw new NotImplementedException($"Server stream cannot create async enumerator"); | ||
|
||
public async ValueTask DisposeAsync() {} // nothing any to dispose | ||
|
||
public async ValueTask Fire(T ev) | ||
=> await OnNextAsync(ev); | ||
} |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.