Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended server methods #100

Merged
merged 11 commits into from
Dec 6, 2024
4 changes: 3 additions & 1 deletion src/Argon.Api/Features/Orleanse/OrleansExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace Argon.Features;

using ActualLab.Serialization;
using Env;
using MessagePack.Formatters;
using MessagePack.Resolvers;
using Orleans.Clustering.Kubernetes;
using Orleans.Configuration;
Expand All @@ -17,7 +18,8 @@ public static WebApplicationBuilder AddOrleans(this WebApplicationBuilder builde
{
var options = MessagePackSerializerOptions.Standard
.WithResolver(CompositeResolver.Create(
DynamicEnumAsStringResolver.Instance,
DynamicEnumAsStringResolver.Instance,
EitherFormatterResolver.Instance,
StandardResolver.Instance));
MessagePackSerializer.DefaultOptions = options;
builder.Services.AddSerializer(x => x.AddMessagePackSerializer(null, null, MessagePackSerializer.DefaultOptions));
Expand Down
5 changes: 4 additions & 1 deletion src/Argon.Api/Features/Rpc/ArgonDescriptorStorage.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
namespace Argon.Services;

public record ArgonTransportOptions(Dictionary<Type, Type> Services);
public record ArgonTransportOptions
{
public Dictionary<Type, Type> Services { get; } = new();
}

public class ArgonDescriptorStorage
{
Expand Down
1 change: 1 addition & 0 deletions src/Argon.Api/Features/Rpc/ArgonTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ namespace Argon.Services;
using Google.Protobuf;
using Grpc.Core;
using Grpc.Net.Client.Configuration;
using MessagePack.Resolvers;
using Transport;

public class ArgonTransport(IServiceProvider provider, ArgonDescriptorStorage storage, ILogger<ArgonTransport> logger) : Transport.ArgonTransport.ArgonTransportBase
Expand Down
8 changes: 7 additions & 1 deletion src/Argon.Api/Features/Rpc/AuthInterceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@ namespace Argon.Services;
using Grpc.Core;
using Grpc.Core.Interceptors;

public class AuthInterceptor(TokenAuthorization tokenAuthorization) : Interceptor
public class AuthInterceptor(TokenAuthorization tokenAuthorization, ILogger<AuthInterceptor> logger) : Interceptor
{
public async override Task<TResponse> UnaryServerHandler<TRequest, TResponse>(TRequest request, ServerCallContext context,
UnaryServerMethod<TRequest, TResponse> continuation)
{
var authorizationHeader = context.RequestHeaders
.FirstOrDefault(h => h.Key.Equals("authorize", StringComparison.InvariantCultureIgnoreCase));
if (authorizationHeader is null)
{
logger.LogWarning($"No authorization token is defined, skip...");
return await continuation(request, context);
}

var authResult = await tokenAuthorization.AuthorizeByToken(authorizationHeader.Value);

if (!authResult.IsSuccess)
{
logger.LogError($"Failed authorization, error: {authResult.Error}");
return await continuation(request, context);
}

context.UserState.Add("userToken", authResult.Value);

Expand Down
9 changes: 8 additions & 1 deletion src/Argon.Api/Grains/ChannelGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public async override Task OnActivateAsync(CancellationToken cancellationToken)
_userStateEmitter = await this.Streams().CreateServerStream();
}

public async Task<List<RealtimeChannelUser>> GetMembers()
=> state.State.Users.Select(x => x.Value).ToList();


// no needed send StreamId too, id is can be computed
public async Task<Maybe<RealtimeToken>> Join(Guid userId)
Expand All @@ -39,7 +42,11 @@ public async Task<Maybe<RealtimeToken>> Join(Guid userId)
}
else
{
state.State.Users.Add(userId, new ChannelRealtimeMember(userId));
state.State.Users.Add(userId, new RealtimeChannelUser()
{
UserId = userId,
State = ChannelMemberState.NONE
});
await state.WriteStateAsync();
}

Expand Down
2 changes: 1 addition & 1 deletion src/Argon.Api/Grains/EmailManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public async Task SendOtpCodeAsync(string email, string otpCode, TimeSpan validi
}
});

await Client.SendMailAsync(new MailMessage(smtpOptions.Value.User, email, $"Your Argon verification code: {otpCode}", form)
await Client.SendMailAsync(new MailMessage(smtpOptions.Value.User, email, $"Your Argon verification code", form)
{
IsBodyHtml = true
});
Expand Down
3 changes: 3 additions & 0 deletions src/Argon.Api/Grains/Interfaces/IChannelGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ public interface IChannelGrain : IGrainWithGuidKey
[Alias("UpdateChannel")]
Task<Channel> UpdateChannel(ChannelInput input);

[Alias("GetMembers")]
Task<List<RealtimeChannelUser>> GetMembers();


// for join\leave\mute\unmute notifications
public const string UserTransformNotificationStream = $"{nameof(IChannelGrain)}.user.transform";
Expand Down
6 changes: 6 additions & 0 deletions src/Argon.Api/Grains/Interfaces/IServerGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ public interface IServerGrain : IGrainWithGuidKey
[Alias("SetUserStatus")]
ValueTask SetUserStatus(Guid userId, UserStatus status);

[Alias("GetMembers")]
Task<List<RealtimeServerMember>> GetMembers();

[Alias("GetChannels")]
Task<List<RealtimeChannel>> GetChannels();

public const string ProviderId = "argon.server.grain.stream";
public const string EventNamespace = "@";
}
Expand Down
42 changes: 33 additions & 9 deletions src/Argon.Api/Grains/ServerGrain.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ public async Task<Either<Server, ServerCreationError>> CreateServer(ServerInput
return await GetServer();
}

public Task<Server> GetServer() => GetAsync();
public Task<Server> GetServer() => context.Servers
.FirstAsync(s => s.Id == this.GetPrimaryKey());

public async Task<Server> UpdateServer(ServerInput input)
{
var server = await GetAsync();
var server = await context.Servers
.FirstAsync(s => s.Id == this.GetPrimaryKey());

var copy = server with { };
server.Name = input.Name ?? server.Name;
Expand All @@ -40,7 +42,35 @@ public async Task<Server> UpdateServer(ServerInput input)
context.Servers.Update(server);
await context.SaveChangesAsync();
await _serverEvents.Fire(new ServerModified(ObjDiff.Compare(copy, server)));
return await GetAsync();
return await context.Servers
.FirstAsync(s => s.Id == this.GetPrimaryKey());
}

public async Task<List<RealtimeServerMember>> GetMembers()
{
var members = await context.UsersToServerRelations.Where(x => x.ServerId == this.GetPrimaryKey())
.ToListAsync();

return members.Select(x => new RealtimeServerMember
{
Member = x,
Status = realtimeState.State.UserStatuses.TryGetValue(x.UserId, out var status) ? status : UserStatus.Offline
}).ToList();
}

public async Task<List<RealtimeChannel>> GetChannels()
{
var channels = await context.Channels
.Where(x => x.ServerId == this.GetPrimaryKey())
.ToListAsync();

var results = await Task.WhenAll(channels.Select(async x => new RealtimeChannel()
{
Channel = x,
Users = await grainFactory.GetGrain<IChannelGrain>(x.Id).GetMembers()
}).ToList());

return results.ToList();
}

public async ValueTask UserJoined(Guid userId)
Expand Down Expand Up @@ -77,10 +107,4 @@ public async Task<Channel> CreateChannel(ChannelInput input, Guid initiator)
await _serverEvents.Fire(new ChannelCreated(channel));
return channel;
}

private async Task<Server> GetAsync() =>
await context.Servers
.Include(x => x.Channels)
.Include(x => x.Users)
.FirstAsync(s => s.Id == this.GetPrimaryKey());
}
2 changes: 1 addition & 1 deletion src/Argon.Api/Grains/States/ChannelGrainState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace Argon.Grains.Persistence.States;
public sealed partial record ChannelGrainState
{
[DataMember(Order = 0), MemoryPackOrder(0), Id(0)]
public Dictionary<Guid, ChannelRealtimeMember> Users { get; set; } = new();
public Dictionary<Guid, RealtimeChannelUser> Users { get; set; } = new();
}

[DataContract, MemoryPackable(GenerateType.VersionTolerant), MessagePackObject(true), Serializable, GenerateSerializer]
Expand Down
28 changes: 17 additions & 11 deletions src/Argon.Api/Services/Transport/ServerInteraction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,30 @@ namespace Argon.Services;

public class ServerInteraction(IGrainFactory grainFactory) : IServerInteraction
{
public async Task<CreateChannelResponse> CreateChannel(CreateChannelRequest request)
{
var user = this.GetUser();
var result = await grainFactory
public async Task CreateChannel(CreateChannelRequest request)
=> await grainFactory
.GetGrain<IServerGrain>(request.serverId)
.CreateChannel(new ChannelInput(request.name, new ChannelEntitlementOverwrite(), request.desc, request.kind), user.id);
return new CreateChannelResponse(request.serverId, result.Id);
}
.CreateChannel(new ChannelInput(request.name, new ChannelEntitlementOverwrite(), request.desc, request.kind), this.GetUser().id);

public Task DeleteChannel(DeleteChannelRequest request)
public Task DeleteChannel(Guid serverId, Guid channelId)
=> throw new NotImplementedException();

public async Task<ChannelJoinResponse> JoinToVoiceChannel(JoinToVoiceChannelRequest request)
public async Task<string> JoinToVoiceChannel(Guid serverId, Guid channelId)
{
var user = this.GetUser();
var result = await grainFactory
.GetGrain<IChannelGrain>(request.channelId)
.GetGrain<IChannelGrain>(channelId)
.Join(user.id);
return new ChannelJoinResponse(result.Value.value);
return result.Value.value;
}

public Task<List<RealtimeChannel>> GetChannels(Guid serverId)
=> grainFactory
.GetGrain<IServerGrain>(serverId)
.GetChannels();

public Task<List<RealtimeServerMember>> GetMembers(Guid serverId)
=> grainFactory
.GetGrain<IServerGrain>(serverId)
.GetMembers();
}
30 changes: 2 additions & 28 deletions src/Argon.Api/Services/Transport/UserInteraction.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
namespace Argon.Services;

using Features.MediaStorage;

public class UserInteraction(IGrainFactory grainFactory, IContentDeliveryNetwork cdn) : IUserInteraction
public class UserInteraction(IGrainFactory grainFactory) : IUserInteraction
{
public async Task<User> GetMe()
{
Expand All @@ -24,7 +22,7 @@ public async Task<List<Server>> GetServers()
{
var userData = this.GetUser();
var servers = await grainFactory.GetGrain<IUserGrain>(userData.id).GetMyServers();
return servers.Select(RegenerateAvatarUrl).ToList();
return servers;
}

[AllowAnonymous]
Expand Down Expand Up @@ -58,28 +56,4 @@ public async Task<Maybe<RegistrationError>> Registration(NewUserCredentialsInput
.Register(input, connInfo);
return result;
}

private Server RegenerateAvatarUrl(Server s)
{
if (string.IsNullOrEmpty(s.AvatarFileId))
return RegenerateUsersAvatars(s);
return RegenerateUsersAvatars(s) with
{
AvatarFileId = cdn.GenerateAssetUrl(StorageNameSpace.ForServer(s.Id), AssetId.FromFileId(s.AvatarFileId!))
};
}


private Server RegenerateUsersAvatars(Server s)
{
if (s.Users.Count == 0)
return s;

foreach (var user in s.Users.Where(x => x.User is { AvatarFileId: not null }))
{
user.User.AvatarFileId = cdn.GenerateAssetUrl(StorageNameSpace.ForUser(user.Id), AssetId.FromFileId(s.AvatarFileId!));
}

return s;
}
}
Loading
Loading