Skip to content

Commit

Permalink
feat(server): add maximum concurrent connector requests limitation
Browse files Browse the repository at this point in the history
  • Loading branch information
thomashilzendegen authored and gingters committed Nov 9, 2023
1 parent 7ab8e1a commit d7938fb
Show file tree
Hide file tree
Showing 19 changed files with 747 additions and 27 deletions.
11 changes: 11 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,17 @@ If a tenant has `RequireAuthentication` enabled in the database, the RelayServer
when the request contains an access token from it's own issuer and audience (e.g., it comes
from a connector). In any other case it returns 401.

### Maximum concurrent connector requests

Setting `MaximumConcurrentConnectorRequests` to a positive value greater than 0 and less than
65.536 limits the amount of requests which will be send to a connector while other requests
are still pending.

__Caution__: If the limit is activated, all requests for that tenant need to be switched to an
acknowledge mode `ConnectorFinished` when they are not set to `Manual`. This means, compared
to the `Disabled` mode, the request will be re-queued if an error raises during processing by the
connector and the acknowledgment isn't send.

## Connector

The `RelayConnectorOptions` type provides the main configuration for the connector. These
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ public class Tenant
/// <remarks>The maximum length is 1000 unicode characters.</remarks>
public string? Description { get; set; }

/// <summary>
/// The maximum amount of concurrent requests a connector should receive.
/// <remarks>Defaults to 0 (unlimited).</remarks>
/// </summary>
public int MaximumConcurrentConnectorRequests { get; set; }

/// <summary>
/// Enable the requirement that only an authenticated request can use this tenant to relay requests.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,19 @@ public ConnectorRegistry(ILogger<ConnectorRegistry<T>> logger, IServiceProvider
/// </summary>
/// <param name="connectionId">The unique id of the connection.</param>
/// <param name="tenantName">The unique name of the tenant.</param>
/// <param name="maximumConcurrentRequests">The amount of maximum concurrent requests.</param>
/// <param name="remoteIpAddress">The optional remote ip address of the connection.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
public async Task RegisterAsync(string connectionId, string tenantName, IPAddress? remoteIpAddress = null)
public async Task RegisterAsync(string connectionId, string tenantName, int maximumConcurrentRequests,
IPAddress? remoteIpAddress = null)
{
_logger.LogDebug(22100,
"Registering connection {TransportConnectionId} for tenant {TenantName}",
connectionId, tenantName);

var registration =
ActivatorUtilities.CreateInstance<ConnectorRegistration>(_serviceProvider, tenantName, connectionId);
ActivatorUtilities.CreateInstance<ConnectorRegistration>(_serviceProvider, tenantName, connectionId,
maximumConcurrentRequests);

var transports = _tenants.GetOrAdd(tenantName, _ => new ConcurrentDictionary<string, IConnectorTransport<T>>());
transports[connectionId] = registration.ConnectorTransport;
Expand All @@ -80,7 +83,7 @@ await _connectionStatisticsWriter.SetConnectionTimeAsync(connectionId, tenantNam
public async Task UnregisterAsync(string connectionId)
{
if (_registrations.TryRemove(connectionId, out var registration) &&
_tenants.TryGetValue(registration.TenantName, out var connectors))
_tenants.TryGetValue(registration.TenantName, out var connectors))
{
_logger.LogDebug(22101, "Unregistering connection {TransportConnectionId} for tenant {TenantName}",
connectionId, registration.TenantName);
Expand All @@ -96,7 +99,8 @@ public async Task UnregisterAsync(string connectionId)
registration?.Dispose();
}

[LoggerMessage(22103, LogLevel.Warning, "Unknown connection {TransportConnectionId} to transport request {RelayRequestId} to")]
[LoggerMessage(22103, LogLevel.Warning,
"Unknown connection {TransportConnectionId} to transport request {RelayRequestId} to")]
partial void LogUnknownRequestConnection(string transportConnectionId, Guid relayRequestId);

/// <summary>
Expand Down Expand Up @@ -143,7 +147,8 @@ public Task AcknowledgeRequestAsync(string connectionId, string acknowledgeId,
return Task.CompletedTask;
}

[LoggerMessage(22105, LogLevel.Trace, "Delivering request {RelayRequestId} to local connection {TransportConnectionId}")]
[LoggerMessage(22105, LogLevel.Trace,
"Delivering request {RelayRequestId} to local connection {TransportConnectionId}")]
partial void LogDeliveringRequest(Guid relayRequestId, string? transportConnectionId);

/// <summary>
Expand Down Expand Up @@ -174,20 +179,21 @@ public async Task<bool> TryDeliverRequestAsync(T request, CancellationToken canc
private class ConnectorRegistration : IDisposable
{
public string TenantName { get; }

public IConnectorTransport<T> ConnectorTransport { get; }

public ITenantHandler TenantHandler { get; }

public ConnectorRegistration(string tenantName, string connectionId,
IConnectorTransportFactory<T> connectorTransportFactory,
ITenantHandlerFactory tenantHandlerFactory)
public ConnectorRegistration(string tenantName, string connectionId, int maximumConcurrentRequests,
IConnectorTransportFactory<T> connectorTransportFactory, ITenantHandlerFactory tenantHandlerFactory)
{
if (connectionId == null) throw new ArgumentNullException(nameof(connectionId));
if (connectorTransportFactory == null) throw new ArgumentNullException(nameof(connectorTransportFactory));
if (tenantHandlerFactory == null) throw new ArgumentNullException(nameof(tenantHandlerFactory));

TenantName = tenantName;
ConnectorTransport = connectorTransportFactory.Create(connectionId);
TenantHandler = tenantHandlerFactory.Create(tenantName, connectionId);
TenantHandler = tenantHandlerFactory.Create(tenantName, connectionId, maximumConcurrentRequests);
}

public void Dispose()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
using System;

namespace Thinktecture.Relay.Server.Transport;

/// <summary>
Expand All @@ -12,6 +10,7 @@ public interface ITenantHandlerFactory
/// </summary>
/// <param name="tenantName">The unique name of the tenant.</param>
/// <param name="connectionId">The unique id of the connection.</param>
/// <param name="maximumConcurrentRequests">The amount of maximum concurrent requests.</param>
/// <returns>An <see cref="ITenantHandler"/>.</returns>
ITenantHandler Create(string tenantName, string connectionId);
ITenantHandler Create(string tenantName, string connectionId, int maximumConcurrentRequests);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ public class Tenant
/// <example>On premise connector in the Thinktecture office in Karlsruhe</example>
public string? Description { get; set; }

/// <summary>
/// Gets or sets whether authentication is required for relaying a request to this tenant.
/// </summary>
public bool? RequireAuthentication { get; set; }

/// <summary>
/// Gets or sets the maximum concurrent connector requests.
/// </summary>
public int? MaximumConcurrentConnectorRequests { get; set; }

/// <summary>
/// Gets or sets the interval used to send keep alive pings between the server and a connector.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public static TenantModel ToModel(this TenantEntity tenant)
Name = tenant.Name,
DisplayName = tenant.DisplayName,
Description = tenant.Description,
RequireAuthentication = tenant.RequireAuthentication,
MaximumConcurrentConnectorRequests = tenant.MaximumConcurrentConnectorRequests,

// config properties
KeepAliveInterval = tenant.Config?.KeepAliveInterval,
Expand Down Expand Up @@ -71,7 +73,7 @@ public static TenantEntity ToEntity(this TenantModel tenant)
};
}

return new TenantEntity()
var entity = new TenantEntity()
{
Name = tenant.Name,
DisplayName = tenant.DisplayName,
Expand All @@ -87,6 +89,18 @@ public static TenantEntity ToEntity(this TenantModel tenant)
}
).ToList(),
};

if (tenant.RequireAuthentication.HasValue)
{
entity.RequireAuthentication = tenant.RequireAuthentication.Value;
}

if (tenant.MaximumConcurrentConnectorRequests.HasValue)
{
entity.MaximumConcurrentConnectorRequests = tenant.MaximumConcurrentConnectorRequests.Value;
}

return entity;
}

/// <summary>
Expand Down
Loading

0 comments on commit d7938fb

Please sign in to comment.