Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Function notifies losing LNS with a direct method #1577

Merged
merged 19 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/e2e-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ jobs:
INTEGRATIONTEST_RunningInCI: true
INTEGRATIONTEST_LoadTestLnsEndpoints: ${{ secrets.LOAD_TEST_LNS_ENDPOINTS }}
INTEGRATIONTEST_NumberOfLoadTestDevices: 10
INTEGRATIONTEST_NumberOfLoadTestConcentrators: 2
INTEGRATIONTEST_NumberOfLoadTestConcentrators: 4

steps:
- uses: actions/checkout@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,34 @@

namespace LoraKeysManagerFacade.FunctionBundler
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using LoRaTools.CommonAPI;
using LoRaWan;
using Microsoft.Azure.Devices;
using Microsoft.Azure.Devices.Client.Exceptions;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

public class DeduplicationExecutionItem : IFunctionBundlerExecutionItem
{
private readonly ILoRaDeviceCacheStore cacheStore;
private readonly IServiceClient serviceClient;

public DeduplicationExecutionItem(ILoRaDeviceCacheStore cacheStore)
private const string MessageIdKey = "MessageId";

public DeduplicationExecutionItem(
ILoRaDeviceCacheStore cacheStore,
IServiceClient serviceClient)
{
this.cacheStore = cacheStore;
this.serviceClient = serviceClient;
}

public async Task<FunctionBundlerExecutionState> ExecuteAsync(IPipelineExecutionContext context)
{
if (context is null) throw new System.ArgumentNullException(nameof(context));
if (context is null) throw new ArgumentNullException(nameof(context));
spygi marked this conversation as resolved.
Show resolved Hide resolved

context.Result.DeduplicationResult = await GetDuplicateMessageResultAsync(context.DevEUI, context.Request.GatewayId, context.Request.ClientFCntUp, context.Request.ClientFCntDown, context.Logger);

Expand Down Expand Up @@ -47,7 +58,8 @@ internal async Task<DuplicateMsgResult> GetDuplicateMessageResultAsync(DevEui de
{
if (await deviceCache.TryToLockAsync())
{
// we are owning the lock now
logger?.LogDebug("Obtained the lock to execute deduplication for device '{DevEui}' and frame counter up '{FrameCounterUp}'.", devEUI, clientFCntUp);
spygi marked this conversation as resolved.
Show resolved Hide resolved

if (deviceCache.TryGetInfo(out var cachedDeviceState))
{
var updateCacheState = false;
Expand All @@ -69,9 +81,48 @@ internal async Task<DuplicateMsgResult> GetDuplicateMessageResultAsync(DevEui de

if (updateCacheState)
{
var previousGateway = cachedDeviceState.GatewayId;

cachedDeviceState.FCntUp = clientFCntUp;
cachedDeviceState.GatewayId = gatewayId;
_ = deviceCache.StoreInfo(cachedDeviceState);

logger?.LogDebug("Previous connection owner was '{PreviousConnectionOwner}', current message was received from '{Gateway}'", previousGateway, gatewayId);
spygi marked this conversation as resolved.
Show resolved Hide resolved
if (previousGateway != gatewayId)
{
var loraC2DMessage = new LoRaCloudToDeviceMessage()
{
DevEUI = devEUI,
Fport = FramePort.AppMin,
MessageId = Guid.NewGuid().ToString()
};

using var scope = logger?.BeginScope(new Dictionary<string, object> { [MessageIdKey] = loraC2DMessage.MessageId });
spygi marked this conversation as resolved.
Show resolved Hide resolved
logger?.LogDebug("Invoking direct method on LNS '{PreviousConnectionOwner}' to drop connection for device '{DevEUI}'", previousGateway, devEUI);

var method = new CloudToDeviceMethod(LoraKeysManagerFacadeConstants.CloudToDeviceDropConnection, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
_ = method.SetPayloadJson(JsonConvert.SerializeObject(loraC2DMessage));

logger?.LogDebug("Payload constructed with device '{DevEui}' for LNS '{PreviousConnectionOwner}'", loraC2DMessage.DevEUI, previousGateway);
spygi marked this conversation as resolved.
Show resolved Hide resolved

try
{
var res = await this.serviceClient.InvokeDeviceMethodAsync(previousGateway, LoraKeysManagerFacadeConstants.NetworkServerModuleId, method);
p-schuler marked this conversation as resolved.
Show resolved Hide resolved
logger?.LogDebug("Invoked direct method on LNS '{PreviousConnectionOwner}'; status '{Status}'", previousGateway, res?.Status);

if (res == null || !HttpUtilities.IsSuccessStatusCode(res.Status))
spygi marked this conversation as resolved.
Show resolved Hide resolved
{
logger?.LogError("Failed to invoke direct method on LNS '{PreviousConnectionOwner}' to drop the connection for device '{DevEUI}'; status '{Status}'", previousGateway, devEUI, res?.Status);
}
}
catch (IotHubException ex)
{
logger?.LogError(ex, "Exception when invoking direct method on LNS '{PreviousConnectionOwner}' to drop the connection for device '{DevEUI}'", previousGateway, devEUI);

// The exception is not rethrown because closing the connection on the losing gateway
// is performed on best effort basis.
}
}
}
}
else
Expand Down
5 changes: 5 additions & 0 deletions LoRaEngine/LoraKeysManagerFacade/HttpUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,10 @@ public static ApiVersion GetRequestedVersion(this HttpRequest req)

return ApiVersion.Parse(versionText);
}

/// <summary>
/// Checks if the http status code indicates success.
/// </summary>
public static bool IsSuccessStatusCode(int statusCode) => statusCode is >= 200 and <= 299;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ internal static class LoraKeysManagerFacadeConstants
internal const string TwinProperty_NwkSKey = "NwkSKey";
internal const string NetworkServerModuleId = "LoRaWanNetworkSrvModule";
internal const string CloudToDeviceMessageMethodName = "cloudtodevicemessage";
internal const string CloudToDeviceDropConnection = "dropconnection";
public const string RoundTripDateTimeStringFormat = "o";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private async Task<IActionResult> SendMessageViaDirectMethodAsync(
_ = method.SetPayloadJson(JsonConvert.SerializeObject(c2dMessage));

var res = await this.serviceClient.InvokeDeviceMethodAsync(preferredGatewayID, LoraKeysManagerFacadeConstants.NetworkServerModuleId, method);
if (IsSuccessStatusCode(res.Status))
if (HttpUtilities.IsSuccessStatusCode(res.Status))
{
this.log.LogInformation("Direct method call to {gatewayID} and {devEUI} succeeded with {statusCode}", preferredGatewayID, devEUI, res.Status);

Expand Down Expand Up @@ -233,10 +233,5 @@ private async Task<IActionResult> SendMessageViaDirectMethodAsync(
};
}
}

/// <summary>
/// Gets if the http status code indicates success.
/// </summary>
private static bool IsSuccessStatusCode(int statusCode) => statusCode is >= 200 and <= 299;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ public sealed class ModuleConnectionHost : IAsyncDisposable
private readonly LoRaDeviceAPIServiceBase loRaDeviceAPIService;
private readonly ILogger<ModuleConnectionHost> logger;
private readonly Counter<int> unhandledExceptionCount;
private readonly Counter<int> forceClosedConnections;
private ILoraModuleClient loRaModuleClient;
private readonly ILoRaModuleClientFactory loRaModuleClientFactory;

public const string DroppedConnectionLog = "Device connection was dropped ";
spygi marked this conversation as resolved.
Show resolved Hide resolved

public ModuleConnectionHost(
NetworkServerConfiguration networkServerConfiguration,
IClassCDeviceMessageSender defaultClassCDevicesMessageSender,
Expand All @@ -45,6 +48,7 @@ public ModuleConnectionHost(
this.loRaModuleClientFactory = loRaModuleClientFactory ?? throw new ArgumentNullException(nameof(loRaModuleClientFactory));
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
this.unhandledExceptionCount = (meter ?? throw new ArgumentNullException(nameof(meter))).CreateCounter<int>(MetricRegistry.UnhandledExceptions);
this.forceClosedConnections = meter.CreateCounter<int>(MetricRegistry.ForceClosedClientConnections);
}

public async Task CreateAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -90,13 +94,19 @@ internal async Task InitModuleAsync(CancellationToken cancellationToken)
internal async Task<MethodResponse> OnDirectMethodCalled(MethodRequest methodRequest, object userContext)
{
if (methodRequest == null) throw new ArgumentNullException(nameof(methodRequest));
this.logger.LogDebug("Direct method '{MethodName}' invoked with data '{JsonData}'.", methodRequest.Name, methodRequest.DataAsJson);

try
{
if (string.Equals(Constants.CloudToDeviceClearCache, methodRequest.Name, StringComparison.OrdinalIgnoreCase))
{
return await ClearCacheAsync();
}
else if (string.Equals(Constants.CloudToDeviceDropConnection, methodRequest.Name, StringComparison.OrdinalIgnoreCase))
{
this.logger.LogDebug("Will drop connection with data '{JsonData}'", methodRequest.DataAsJson);
return await DropConnectionAsync(methodRequest);
}
else if (string.Equals(Constants.CloudToDeviceDecoderElementName, methodRequest.Name, StringComparison.OrdinalIgnoreCase))
{
return await SendCloudToDeviceMessageAsync(methodRequest);
Expand All @@ -113,7 +123,7 @@ internal async Task<MethodResponse> OnDirectMethodCalled(MethodRequest methodReq
}
}

internal async Task<MethodResponse> SendCloudToDeviceMessageAsync(MethodRequest methodRequest)
private async Task<MethodResponse> SendCloudToDeviceMessageAsync(MethodRequest methodRequest)
{
if (!string.IsNullOrEmpty(methodRequest.DataAsJson))
{
Expand Down Expand Up @@ -149,6 +159,47 @@ private async Task<MethodResponse> ClearCacheAsync()
return new MethodResponse((int)HttpStatusCode.OK);
}

private async Task<MethodResponse> DropConnectionAsync(MethodRequest methodRequest)
{
this.forceClosedConnections.Add(1);
ReceivedLoRaCloudToDeviceMessage c2d = null;

this.logger.LogDebug("Will deserialize '{JsonData}'.", methodRequest.DataAsJson);
spygi marked this conversation as resolved.
Show resolved Hide resolved

try
{
c2d = JsonSerializer.Deserialize<ReceivedLoRaCloudToDeviceMessage>(methodRequest.DataAsJson, new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
}
catch (Exception ex) when (ex is ArgumentNullException or JsonException)
spygi marked this conversation as resolved.
Show resolved Hide resolved
{
this.logger.LogError(ex, "Unable to parse Json for direct method '{MethodName}' for device '{DevEui}', message id '{MessageId}'", methodRequest.Name, c2d?.DevEUI, c2d?.MessageId);
return new MethodResponse((int)HttpStatusCode.BadRequest);
}

using var cts = methodRequest.ResponseTimeout.HasValue ? new CancellationTokenSource(methodRequest.ResponseTimeout.Value) : null;
spygi marked this conversation as resolved.
Show resolved Hide resolved

if (c2d.DevEUI == null)
{
this.logger.LogError("DevEUI missing, cannot identify device to drop connection for; message Id '{MessageId}'", c2d.MessageId);
return new MethodResponse((int)HttpStatusCode.BadRequest);
}

using var scope = this.logger.BeginDeviceScope(c2d.DevEUI);

var loRaDevice = await this.loRaDeviceRegistry.GetDeviceByDevEUIAsync(c2d.DevEUI.Value);
if (loRaDevice == null)
{
this.logger.LogError("Could not retrieve LoRa device; message id '{MessageId}'", c2d.MessageId);
return new MethodResponse((int)HttpStatusCode.NotFound);
}

loRaDevice.IsConnectionOwner = false;
spygi marked this conversation as resolved.
Show resolved Hide resolved
await loRaDevice.CloseConnectionAsync(cts?.Token ?? CancellationToken.None, force: true);
this.logger.LogInformation(DroppedConnectionLog + "from gateway with id '{GatewayId}', message id '{MessageId}'", this.networkServerConfiguration.GatewayID, c2d.MessageId);

return new MethodResponse((int)HttpStatusCode.OK);
}

/// <summary>
/// Method to update the desired properties.
/// We only want to update the auth code if the facadeUri was performed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public static class Constants
/// </summary>
public const string CloudToDeviceClearCache = "clearcache";

/// <summary>
/// Property in decoder json response commanding LNS to drop the connection.
/// </summary>
public const string CloudToDeviceDropConnection = "dropconnection";

/// <summary>
/// Minimum value for device connection keep alive timeout (1 minute).
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ internal static class MetricRegistry
public static readonly CustomMetric DeviceLoadRequests = new CustomMetric("DeviceLoadRequests", "Number of device load requests issued against an API service", MetricType.Counter, new[] { GatewayIdTagName });
public static readonly CustomMetric TwinLoadRequests = new CustomMetric("TwinLoadRequests", "Number of device twin load requests issued against RegistryManager and DeviceClient", MetricType.Counter, new[] { GatewayIdTagName });
public static readonly CustomMetric ActiveClientConnections = new CustomMetric("ActiveClientConnections", "Number of active client connections", MetricType.ObservableGauge, new[] { GatewayIdTagName });
public static readonly CustomMetric ForceClosedClientConnections = new CustomMetric("ForceClosedClientConnections", "Number of device client connections requested to be closed by the FunctionBundler", MetricType.Counter, new[] { GatewayIdTagName });

private static readonly ICollection<CustomMetric> Registry = new[]
{
Expand All @@ -52,7 +53,8 @@ internal static class MetricRegistry
DeviceCacheHits,
DeviceLoadRequests,
TwinLoadRequests,
ActiveClientConnections
ActiveClientConnections,
ForceClosedClientConnections
};

public static readonly IDictionary<string, CustomMetric> RegistryLookup =
Expand Down Expand Up @@ -176,16 +178,16 @@ internal static class MetricsExtensions
public static Counter<T> CreateCounter<T>(this Meter meter, CustomMetric customMetric) where T : struct =>
customMetric.Type == MetricType.Counter
? meter.CreateCounter<T>(customMetric.Name, description: customMetric.Description)
: throw new ArgumentException("Custom metric must of type Counter", nameof(customMetric));
: throw new ArgumentException("Custom metric must be of type Counter", nameof(customMetric));

public static Histogram<T> CreateHistogram<T>(this Meter meter, CustomMetric customMetric) where T : struct =>
customMetric.Type == MetricType.Histogram
? meter.CreateHistogram<T>(customMetric.Name, description: customMetric.Description)
: throw new ArgumentException("Custom metric must of type Histogram", nameof(customMetric));
: throw new ArgumentException("Custom metric must be of type Histogram", nameof(customMetric));

public static ObservableGauge<T> CreateObservableGauge<T>(this Meter meter, CustomMetric customMetric, Func<T> observeValue) where T : struct =>
customMetric.Type == MetricType.ObservableGauge
? meter.CreateObservableGauge(customMetric.Name, observeValue, description: customMetric.Description)
: throw new ArgumentException("Custom metric must of type Histogram", nameof(customMetric));
: throw new ArgumentException("Custom metric must be of type Histogram", nameof(customMetric));
}
}
15 changes: 8 additions & 7 deletions Tests/Common/SimulatedDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ public sealed class SimulatedDevice
private static readonly IJsonReader<DevEui> DevEuiMessageReader =
JsonReader.Object(JsonReader.Property("DevEui", from d in JsonReader.String()
select DevEui.Parse(d)));
private readonly List<SimulatedBasicsStation> simulatedBasicsStations = new List<SimulatedBasicsStation>();

private readonly ConcurrentBag<string> receivedMessages = new ConcurrentBag<string>();
private readonly ILogger logger;

public IReadOnlyCollection<string> ReceivedMessages => this.receivedMessages;

public IReadOnlyCollection<SimulatedBasicsStation> SimulatedBasicsStations { get; set; }

public TestDeviceInfo LoRaDevice { get; internal set; }

public uint FrmCntUp { get; set; }
Expand Down Expand Up @@ -78,7 +79,7 @@ public SimulatedDevice(TestDeviceInfo testDeviceInfo, uint frmCntDown = 0, uint
FrmCntDown = frmCntDown;
FrmCntUp = frmCntUp;
this.logger = logger;
this.simulatedBasicsStations = simulatedBasicsStation?.ToList() ?? new List<SimulatedBasicsStation>();
SimulatedBasicsStations = simulatedBasicsStation?.ToList() ?? new List<SimulatedBasicsStation>();

void AddToDeviceMessageQueue(string response)
{
Expand All @@ -88,7 +89,7 @@ void AddToDeviceMessageQueue(string response)
}
}

foreach (var basicsStation in this.simulatedBasicsStations)
foreach (var basicsStation in SimulatedBasicsStations)
basicsStation.MessageReceived += (_, eventArgs) => AddToDeviceMessageQueue(eventArgs.Value);
}

Expand Down Expand Up @@ -257,7 +258,7 @@ private bool HandleJoinAccept(LoRaPayloadJoinAccept payload)

//// Sends unconfirmed message
public Task SendDataMessageAsync(LoRaRequest loRaRequest) =>
Task.WhenAll(from basicsStation in this.simulatedBasicsStations
Task.WhenAll(from basicsStation in SimulatedBasicsStations
select basicsStation.SendDataMessageAsync(loRaRequest, CancellationToken.None));

// Performs join
Expand Down Expand Up @@ -305,12 +306,12 @@ void OnMessageReceived(object sender, EventArgs<string> response)
}
}

foreach (var basicsStation in this.simulatedBasicsStations)
foreach (var basicsStation in SimulatedBasicsStations)
basicsStation.MessageReceived += OnMessageReceived;

try
{
foreach (var basicsStation in this.simulatedBasicsStations)
foreach (var basicsStation in SimulatedBasicsStations)
{
await basicsStation.SerializeAndSendMessageAsync(new
{
Expand Down Expand Up @@ -342,7 +343,7 @@ await basicsStation.SerializeAndSendMessageAsync(new
}
finally
{
foreach (var basicsStation in this.simulatedBasicsStations)
foreach (var basicsStation in SimulatedBasicsStations)
basicsStation.MessageReceived -= OnMessageReceived;
}
}
Expand Down
Loading