diff --git a/src/Argon.Api/Features/Orleanse/OrleansExtension.cs b/src/Argon.Api/Features/Orleanse/OrleansExtension.cs index fad1e69..4b566b6 100644 --- a/src/Argon.Api/Features/Orleanse/OrleansExtension.cs +++ b/src/Argon.Api/Features/Orleanse/OrleansExtension.cs @@ -2,6 +2,7 @@ namespace Argon.Features; using ActualLab.Serialization; using Env; +using MessagePack.Formatters; using MessagePack.Resolvers; using Orleans.Clustering.Kubernetes; using Orleans.Configuration; @@ -17,7 +18,8 @@ public static WebApplicationBuilder AddOrleans(this WebApplicationBuilder builde { var options = MessagePackSerializerOptions.Standard .WithResolver(CompositeResolver.Create( - DynamicEnumAsStringResolver.Instance, + DynamicEnumAsStringResolver.Instance, + EitherFormatterResolver.Instance, StandardResolver.Instance)); MessagePackSerializer.DefaultOptions = options; builder.Services.AddSerializer(x => x.AddMessagePackSerializer(null, null, MessagePackSerializer.DefaultOptions)); diff --git a/src/Argon.Api/Features/Rpc/ArgonDescriptorStorage.cs b/src/Argon.Api/Features/Rpc/ArgonDescriptorStorage.cs index 696e1d1..3a5f07d 100644 --- a/src/Argon.Api/Features/Rpc/ArgonDescriptorStorage.cs +++ b/src/Argon.Api/Features/Rpc/ArgonDescriptorStorage.cs @@ -1,6 +1,9 @@ namespace Argon.Services; -public record ArgonTransportOptions(Dictionary Services); +public record ArgonTransportOptions +{ + public Dictionary Services { get; } = new(); +} public class ArgonDescriptorStorage { diff --git a/src/Argon.Api/Features/Rpc/ArgonTransport.cs b/src/Argon.Api/Features/Rpc/ArgonTransport.cs index 4d841f8..1776f13 100644 --- a/src/Argon.Api/Features/Rpc/ArgonTransport.cs +++ b/src/Argon.Api/Features/Rpc/ArgonTransport.cs @@ -3,6 +3,7 @@ namespace Argon.Services; using Google.Protobuf; using Grpc.Core; using Grpc.Net.Client.Configuration; +using MessagePack.Resolvers; using Transport; public class ArgonTransport(IServiceProvider provider, ArgonDescriptorStorage storage, ILogger logger) : Transport.ArgonTransport.ArgonTransportBase diff --git a/src/Argon.Api/Features/Rpc/AuthInterceptor.cs b/src/Argon.Api/Features/Rpc/AuthInterceptor.cs index 9b31f4b..facd400 100644 --- a/src/Argon.Api/Features/Rpc/AuthInterceptor.cs +++ b/src/Argon.Api/Features/Rpc/AuthInterceptor.cs @@ -4,7 +4,7 @@ namespace Argon.Services; using Grpc.Core; using Grpc.Core.Interceptors; -public class AuthInterceptor(TokenAuthorization tokenAuthorization) : Interceptor +public class AuthInterceptor(TokenAuthorization tokenAuthorization, ILogger logger) : Interceptor { public async override Task UnaryServerHandler(TRequest request, ServerCallContext context, UnaryServerMethod continuation) @@ -12,12 +12,18 @@ public async override Task UnaryServerHandler(TR var authorizationHeader = context.RequestHeaders .FirstOrDefault(h => h.Key.Equals("authorize", StringComparison.InvariantCultureIgnoreCase)); if (authorizationHeader is null) + { + logger.LogWarning($"No authorization token is defined, skip..."); return await continuation(request, context); + } var authResult = await tokenAuthorization.AuthorizeByToken(authorizationHeader.Value); if (!authResult.IsSuccess) + { + logger.LogError($"Failed authorization, error: {authResult.Error}"); return await continuation(request, context); + } context.UserState.Add("userToken", authResult.Value); diff --git a/src/Argon.Api/Grains/ChannelGrain.cs b/src/Argon.Api/Grains/ChannelGrain.cs index 1d295a8..ded42b6 100644 --- a/src/Argon.Api/Grains/ChannelGrain.cs +++ b/src/Argon.Api/Grains/ChannelGrain.cs @@ -25,6 +25,9 @@ public async override Task OnActivateAsync(CancellationToken cancellationToken) _userStateEmitter = await this.Streams().CreateServerStream(); } + public async Task> GetMembers() + => state.State.Users.Select(x => x.Value).ToList(); + // no needed send StreamId too, id is can be computed public async Task> Join(Guid userId) @@ -39,7 +42,11 @@ public async Task> Join(Guid userId) } else { - state.State.Users.Add(userId, new ChannelRealtimeMember(userId)); + state.State.Users.Add(userId, new RealtimeChannelUser() + { + UserId = userId, + State = ChannelMemberState.NONE + }); await state.WriteStateAsync(); } diff --git a/src/Argon.Api/Grains/EmailManager.cs b/src/Argon.Api/Grains/EmailManager.cs index 354aa88..8ec9554 100644 --- a/src/Argon.Api/Grains/EmailManager.cs +++ b/src/Argon.Api/Grains/EmailManager.cs @@ -34,7 +34,7 @@ public async Task SendOtpCodeAsync(string email, string otpCode, TimeSpan validi } }); - await Client.SendMailAsync(new MailMessage(smtpOptions.Value.User, email, $"Your Argon verification code: {otpCode}", form) + await Client.SendMailAsync(new MailMessage(smtpOptions.Value.User, email, $"Your Argon verification code", form) { IsBodyHtml = true }); diff --git a/src/Argon.Api/Grains/Interfaces/IChannelGrain.cs b/src/Argon.Api/Grains/Interfaces/IChannelGrain.cs index 81a8bdc..7785b97 100644 --- a/src/Argon.Api/Grains/Interfaces/IChannelGrain.cs +++ b/src/Argon.Api/Grains/Interfaces/IChannelGrain.cs @@ -18,6 +18,9 @@ public interface IChannelGrain : IGrainWithGuidKey [Alias("UpdateChannel")] Task UpdateChannel(ChannelInput input); + [Alias("GetMembers")] + Task> GetMembers(); + // for join\leave\mute\unmute notifications public const string UserTransformNotificationStream = $"{nameof(IChannelGrain)}.user.transform"; diff --git a/src/Argon.Api/Grains/Interfaces/IServerGrain.cs b/src/Argon.Api/Grains/Interfaces/IServerGrain.cs index 636cb67..1354d5c 100644 --- a/src/Argon.Api/Grains/Interfaces/IServerGrain.cs +++ b/src/Argon.Api/Grains/Interfaces/IServerGrain.cs @@ -21,6 +21,12 @@ public interface IServerGrain : IGrainWithGuidKey [Alias("SetUserStatus")] ValueTask SetUserStatus(Guid userId, UserStatus status); + [Alias("GetMembers")] + Task> GetMembers(); + + [Alias("GetChannels")] + Task> GetChannels(); + public const string ProviderId = "argon.server.grain.stream"; public const string EventNamespace = "@"; } diff --git a/src/Argon.Api/Grains/ServerGrain.cs b/src/Argon.Api/Grains/ServerGrain.cs index e5558a3..77bdc72 100644 --- a/src/Argon.Api/Grains/ServerGrain.cs +++ b/src/Argon.Api/Grains/ServerGrain.cs @@ -27,11 +27,13 @@ public async Task> CreateServer(ServerInput return await GetServer(); } - public Task GetServer() => GetAsync(); + public Task GetServer() => context.Servers + .FirstAsync(s => s.Id == this.GetPrimaryKey()); public async Task UpdateServer(ServerInput input) { - var server = await GetAsync(); + var server = await context.Servers + .FirstAsync(s => s.Id == this.GetPrimaryKey()); var copy = server with { }; server.Name = input.Name ?? server.Name; @@ -40,7 +42,35 @@ public async Task UpdateServer(ServerInput input) context.Servers.Update(server); await context.SaveChangesAsync(); await _serverEvents.Fire(new ServerModified(ObjDiff.Compare(copy, server))); - return await GetAsync(); + return await context.Servers + .FirstAsync(s => s.Id == this.GetPrimaryKey()); + } + + public async Task> GetMembers() + { + var members = await context.UsersToServerRelations.Where(x => x.ServerId == this.GetPrimaryKey()) + .ToListAsync(); + + return members.Select(x => new RealtimeServerMember + { + Member = x, + Status = realtimeState.State.UserStatuses.TryGetValue(x.UserId, out var status) ? status : UserStatus.Offline + }).ToList(); + } + + public async Task> GetChannels() + { + var channels = await context.Channels + .Where(x => x.ServerId == this.GetPrimaryKey()) + .ToListAsync(); + + var results = await Task.WhenAll(channels.Select(async x => new RealtimeChannel() + { + Channel = x, + Users = await grainFactory.GetGrain(x.Id).GetMembers() + }).ToList()); + + return results.ToList(); } public async ValueTask UserJoined(Guid userId) @@ -77,10 +107,4 @@ public async Task CreateChannel(ChannelInput input, Guid initiator) await _serverEvents.Fire(new ChannelCreated(channel)); return channel; } - - private async Task GetAsync() => - await context.Servers - .Include(x => x.Channels) - .Include(x => x.Users) - .FirstAsync(s => s.Id == this.GetPrimaryKey()); } \ No newline at end of file diff --git a/src/Argon.Api/Grains/States/ChannelGrainState.cs b/src/Argon.Api/Grains/States/ChannelGrainState.cs index 436da1c..2300ef5 100644 --- a/src/Argon.Api/Grains/States/ChannelGrainState.cs +++ b/src/Argon.Api/Grains/States/ChannelGrainState.cs @@ -4,7 +4,7 @@ namespace Argon.Grains.Persistence.States; public sealed partial record ChannelGrainState { [DataMember(Order = 0), MemoryPackOrder(0), Id(0)] - public Dictionary Users { get; set; } = new(); + public Dictionary Users { get; set; } = new(); } [DataContract, MemoryPackable(GenerateType.VersionTolerant), MessagePackObject(true), Serializable, GenerateSerializer] diff --git a/src/Argon.Api/Services/Transport/ServerInteraction.cs b/src/Argon.Api/Services/Transport/ServerInteraction.cs index af28b77..622925a 100644 --- a/src/Argon.Api/Services/Transport/ServerInteraction.cs +++ b/src/Argon.Api/Services/Transport/ServerInteraction.cs @@ -2,24 +2,30 @@ namespace Argon.Services; public class ServerInteraction(IGrainFactory grainFactory) : IServerInteraction { - public async Task CreateChannel(CreateChannelRequest request) - { - var user = this.GetUser(); - var result = await grainFactory + public async Task CreateChannel(CreateChannelRequest request) + => await grainFactory .GetGrain(request.serverId) - .CreateChannel(new ChannelInput(request.name, new ChannelEntitlementOverwrite(), request.desc, request.kind), user.id); - return new CreateChannelResponse(request.serverId, result.Id); - } + .CreateChannel(new ChannelInput(request.name, new ChannelEntitlementOverwrite(), request.desc, request.kind), this.GetUser().id); - public Task DeleteChannel(DeleteChannelRequest request) + public Task DeleteChannel(Guid serverId, Guid channelId) => throw new NotImplementedException(); - public async Task JoinToVoiceChannel(JoinToVoiceChannelRequest request) + public async Task JoinToVoiceChannel(Guid serverId, Guid channelId) { var user = this.GetUser(); var result = await grainFactory - .GetGrain(request.channelId) + .GetGrain(channelId) .Join(user.id); - return new ChannelJoinResponse(result.Value.value); + return result.Value.value; } + + public Task> GetChannels(Guid serverId) + => grainFactory + .GetGrain(serverId) + .GetChannels(); + + public Task> GetMembers(Guid serverId) + => grainFactory + .GetGrain(serverId) + .GetMembers(); } \ No newline at end of file diff --git a/src/Argon.Api/Services/Transport/UserInteraction.cs b/src/Argon.Api/Services/Transport/UserInteraction.cs index 32a062d..de2c142 100644 --- a/src/Argon.Api/Services/Transport/UserInteraction.cs +++ b/src/Argon.Api/Services/Transport/UserInteraction.cs @@ -1,8 +1,6 @@ namespace Argon.Services; -using Features.MediaStorage; - -public class UserInteraction(IGrainFactory grainFactory, IContentDeliveryNetwork cdn) : IUserInteraction +public class UserInteraction(IGrainFactory grainFactory) : IUserInteraction { public async Task GetMe() { @@ -24,7 +22,7 @@ public async Task> GetServers() { var userData = this.GetUser(); var servers = await grainFactory.GetGrain(userData.id).GetMyServers(); - return servers.Select(RegenerateAvatarUrl).ToList(); + return servers; } [AllowAnonymous] @@ -58,28 +56,4 @@ public async Task> Registration(NewUserCredentialsInput .Register(input, connInfo); return result; } - - private Server RegenerateAvatarUrl(Server s) - { - if (string.IsNullOrEmpty(s.AvatarFileId)) - return RegenerateUsersAvatars(s); - return RegenerateUsersAvatars(s) with - { - AvatarFileId = cdn.GenerateAssetUrl(StorageNameSpace.ForServer(s.Id), AssetId.FromFileId(s.AvatarFileId!)) - }; - } - - - private Server RegenerateUsersAvatars(Server s) - { - if (s.Users.Count == 0) - return s; - - foreach (var user in s.Users.Where(x => x.User is { AvatarFileId: not null })) - { - user.User.AvatarFileId = cdn.GenerateAssetUrl(StorageNameSpace.ForUser(user.Id), AssetId.FromFileId(s.AvatarFileId!)); - } - - return s; - } } \ No newline at end of file diff --git a/src/Argon.Contracts/Either.cs b/src/Argon.Contracts/Either.cs index 008e8de..2ec215c 100644 --- a/src/Argon.Contracts/Either.cs +++ b/src/Argon.Contracts/Either.cs @@ -1,7 +1,100 @@ namespace Argon; +using MessagePack.Formatters; +using MessagePack.Resolvers; + +public class EitherFormatterResolver : IFormatterResolver +{ + public static readonly EitherFormatterResolver Instance = new(); + + private EitherFormatterResolver() { } + + public IMessagePackFormatter? GetFormatter() + { + var type = typeof(T); + if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Either<,>)) + { + var resultType = type.GetGenericArguments()[0]; + var errorType = type.GetGenericArguments()[1]; + + var formatterType = typeof(EitherFormatter<,>).MakeGenericType(resultType, errorType); + return (IMessagePackFormatter)Activator.CreateInstance(formatterType)!; + } + + return StandardResolver.Instance.GetFormatter(); + } +} +public class EitherFormatter : IMessagePackFormatter> + where TResult : class +{ + public void Serialize(ref MessagePackWriter writer, Either value, MessagePackSerializerOptions options) + { + var resolver = options.Resolver; + + writer.WriteMapHeader(3); + + writer.Write("success"); + writer.Write(value.IsSuccess); + + writer.Write("result"); + if (value.IsSuccess) + resolver.GetFormatterWithVerify().Serialize(ref writer, value.Value, options); + else + writer.WriteNil(); + + writer.Write(nameof(Either.Error).ToLowerInvariant()); + if (!value.IsSuccess) + resolver.GetFormatterWithVerify().Serialize(ref writer, value.Error, options); + else + writer.WriteNil(); + } + + public Either Deserialize(ref MessagePackReader reader, MessagePackSerializerOptions options) + { + var resolver = options.Resolver; + + var count = reader.ReadMapHeader(); + if (count != 3) + throw new InvalidOperationException("Invalid data format for Either."); + + var isSuccess = false; + TResult? result = null; + TError? error = default; + + for (var i = 0; i < count; i++) + { + var propertyName = reader.ReadString(); + switch (propertyName) + { + case nameof(Either.IsSuccess): + isSuccess = reader.ReadBoolean(); + break; + case nameof(Either.Value): + if (isSuccess) + result = resolver.GetFormatterWithVerify().Deserialize(ref reader, options); + else + reader.Skip(); + break; + + case nameof(Either.Error): + if (!isSuccess) + error = resolver.GetFormatterWithVerify().Deserialize(ref reader, options); + else + reader.Skip(); + break; + + default: + reader.Skip(); + break; + } + } + return isSuccess + ? Either.Success(result!) + : Either.Failure(error!); + } +} [JsonObject, MessagePackObject(true), Serializable] -public readonly record struct Either where TResult : class +public record struct Either where TResult : class { private Either(TResult result) { @@ -21,10 +114,10 @@ internal Either(TResult result, TError error) _error = error; } - [JsonProperty("result"), MessagePack.Key(0)] + [JsonProperty("result")] private TResult? _result { get; init; } - [JsonProperty("error"), MessagePack.Key(1)] + [JsonProperty("error")] private TError? _error { get; init; } [JsonIgnore, IgnoreMember] @@ -43,7 +136,7 @@ internal Either(TResult result, TError error) public static implicit operator Either(TError error) => new(error); } -[JsonObject, MessagePackObject, Serializable] +[JsonObject, MessagePackObject(true), Serializable] public readonly record struct Maybe { [JsonProperty("value"), MessagePack.Key(0)] diff --git a/src/Argon.Contracts/IServerInteraction.cs b/src/Argon.Contracts/IServerInteraction.cs index b4f8ee5..2ab84e6 100644 --- a/src/Argon.Contracts/IServerInteraction.cs +++ b/src/Argon.Contracts/IServerInteraction.cs @@ -5,7 +5,11 @@ namespace Argon; [TsInterface] public interface IServerInteraction : IArgonService { - Task CreateChannel(CreateChannelRequest request); - Task DeleteChannel(DeleteChannelRequest request); - Task JoinToVoiceChannel(JoinToVoiceChannelRequest request); + Task CreateChannel(CreateChannelRequest request); + Task DeleteChannel(Guid serverId, Guid channelId); + Task JoinToVoiceChannel(Guid serverId, Guid channelId); + + + Task> GetChannels(Guid serverId); + Task> GetMembers(Guid serverId); } \ No newline at end of file diff --git a/src/Argon.Contracts/Servers/Channel.cs b/src/Argon.Contracts/Servers/Channel.cs index 3582c60..206fab7 100644 --- a/src/Argon.Contracts/Servers/Channel.cs +++ b/src/Argon.Contracts/Servers/Channel.cs @@ -21,4 +21,31 @@ public record Channel : ArgonEntityWithOwnership, IArchetypeObject public virtual ICollection EntitlementOverwrites { get; set; } = new List(); public ICollection Overwrites => EntitlementOverwrites.OfType().ToList(); +} + +[TsInterface, MessagePackObject(true)] +public record RealtimeChannel +{ + public Channel Channel { get; set; } + + public List Users { get; set; } +} + +[TsInterface, MessagePackObject(true)] +public record RealtimeChannelUser +{ + public Guid UserId { get; set; } + + public ChannelMemberState State { get; set; } +} + +[Flags] +public enum ChannelMemberState +{ + NONE = 0, + MUTED = 1 << 1, + MUTED_BY_SERVER = 1 << 2, + MUTED_HEADPHONES = 1 << 3, + MUTED_HEADPHONES_BY_SERVER = 1 << 4, + STREAMING = 1 << 5 } \ No newline at end of file diff --git a/src/Argon.Contracts/Servers/ChannelJoinResponse.cs b/src/Argon.Contracts/Servers/ChannelJoinResponse.cs deleted file mode 100644 index fa8f7a4..0000000 --- a/src/Argon.Contracts/Servers/ChannelJoinResponse.cs +++ /dev/null @@ -1,5 +0,0 @@ -namespace Argon; - -[TsInterface, MessagePackObject(true)] -public sealed record ChannelJoinResponse( - string Token); \ No newline at end of file diff --git a/src/Argon.Contracts/Servers/ChannelRealtimeMember.cs b/src/Argon.Contracts/Servers/ChannelRealtimeMember.cs deleted file mode 100644 index 09fed30..0000000 --- a/src/Argon.Contracts/Servers/ChannelRealtimeMember.cs +++ /dev/null @@ -1,4 +0,0 @@ -namespace Argon; - -[TsInterface, MessagePackObject(true)] -public record ChannelRealtimeMember(Guid UserId); \ No newline at end of file diff --git a/src/Argon.Contracts/Servers/ServerMember.cs b/src/Argon.Contracts/Servers/ServerMember.cs index 511eaa1..59c3ebb 100644 --- a/src/Argon.Contracts/Servers/ServerMember.cs +++ b/src/Argon.Contracts/Servers/ServerMember.cs @@ -17,4 +17,10 @@ public record ServerMember : ArgonEntityWithOwnership public ICollection ServerMemberArchetypes { get; set; } = new List(); +} +[TsInterface, MessagePackObject(true)] +public record RealtimeServerMember +{ + public ServerMember Member { get; set; } + public UserStatus Status { get; set; } } \ No newline at end of file diff --git a/src/Argon.Contracts/Streaming/Events/DeleteChannelRequest.cs b/src/Argon.Contracts/Streaming/Events/DeleteChannelRequest.cs deleted file mode 100644 index 1d62de2..0000000 --- a/src/Argon.Contracts/Streaming/Events/DeleteChannelRequest.cs +++ /dev/null @@ -1,4 +0,0 @@ -namespace Argon.Streaming.Events; - -[TsInterface, MessagePackObject(true)] -public record DeleteChannelRequest(Guid serverId, Guid channelId); \ No newline at end of file diff --git a/src/Argon.Contracts/Streaming/Events/JoinToVoiceChannelRequest.cs b/src/Argon.Contracts/Streaming/Events/JoinToVoiceChannelRequest.cs deleted file mode 100644 index 674d0f2..0000000 --- a/src/Argon.Contracts/Streaming/Events/JoinToVoiceChannelRequest.cs +++ /dev/null @@ -1,4 +0,0 @@ -namespace Argon.Streaming.Events; - -[TsInterface, MessagePackObject(true)] -public record JoinToVoiceChannelRequest(Guid serverId, Guid channelId); \ No newline at end of file diff --git a/src/Argon.Contracts/Users/AuthorizationError.cs b/src/Argon.Contracts/Users/AuthorizationError.cs index 3b20c95..f2e7dcd 100644 --- a/src/Argon.Contracts/Users/AuthorizationError.cs +++ b/src/Argon.Contracts/Users/AuthorizationError.cs @@ -1,8 +1,8 @@ namespace Argon.Users; -[TsEnum] public enum AuthorizationError { + NONE, BAD_CREDENTIALS, REQUIRED_OTP, BAD_OTP diff --git a/src/Argon.Contracts/Users/LockdownReason.cs b/src/Argon.Contracts/Users/LockdownReason.cs index f5804a6..af400a6 100644 --- a/src/Argon.Contracts/Users/LockdownReason.cs +++ b/src/Argon.Contracts/Users/LockdownReason.cs @@ -1,6 +1,5 @@ namespace Argon.Users; -[TsEnum] public enum LockdownReason { NONE = 0, diff --git a/src/Argon.Contracts/Users/RegistrationError.cs b/src/Argon.Contracts/Users/RegistrationError.cs index 9c93bd5..3d549e1 100644 --- a/src/Argon.Contracts/Users/RegistrationError.cs +++ b/src/Argon.Contracts/Users/RegistrationError.cs @@ -1,6 +1,5 @@ namespace Argon.Users; -[TsEnum] public enum RegistrationError { USERNAME_ALREADY_TAKEN,