Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Function notifies losing LNS with a direct method #1577

Merged
merged 19 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/e2e-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ jobs:
INTEGRATIONTEST_RunningInCI: true
INTEGRATIONTEST_LoadTestLnsEndpoints: ${{ secrets.LOAD_TEST_LNS_ENDPOINTS }}
INTEGRATIONTEST_NumberOfLoadTestDevices: 10
INTEGRATIONTEST_NumberOfLoadTestConcentrators: 2
INTEGRATIONTEST_NumberOfLoadTestConcentrators: 4

steps:
- uses: actions/checkout@v2
Expand Down
4 changes: 3 additions & 1 deletion LoRaEngine/LoraKeysManagerFacade/FacadeStartup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ public override void Configure(IFunctionsHostBuilder builder)
.AddSingleton<IFunctionBundlerExecutionItem, DeduplicationExecutionItem>()
.AddSingleton<IFunctionBundlerExecutionItem, ADRExecutionItem>()
.AddSingleton<IFunctionBundlerExecutionItem, PreferredGatewayExecutionItem>()
.AddSingleton<LoRaDevAddrCache>();
.AddSingleton<LoRaDevAddrCache>()
.AddApplicationInsightsTelemetry();
}

private abstract class ConfigHandler
Expand Down Expand Up @@ -128,6 +129,7 @@ internal LocalConfigHandler()
internal override string RedisConnectionString => this.config.GetValue<string>(RedisConnectionStringKey);

internal override string IoTHubConnectionString => this.config.GetValue<string>(IoTHubConnectionStringKey);

internal override string StorageConnectionString => this.config.GetConnectionStringOrSetting(StorageConnectionStringKey);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,42 @@

namespace LoraKeysManagerFacade.FunctionBundler
{
using System;
using System.Threading.Tasks;
using LoRaTools.CommonAPI;
using LoRaWan;
using Microsoft.ApplicationInsights;
using Microsoft.ApplicationInsights.Extensibility;
using Microsoft.ApplicationInsights.Metrics;
using Microsoft.Azure.Devices;
using Microsoft.Azure.Devices.Client.Exceptions;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

public class DeduplicationExecutionItem : IFunctionBundlerExecutionItem
{
private const string ConnectionOwnershipChangeMetricName = "ConnectionOwnershipChange";

private readonly ILoRaDeviceCacheStore cacheStore;
private readonly IServiceClient serviceClient;
private readonly Microsoft.ApplicationInsights.Metric connectionOwnershipChangedMetric;

public DeduplicationExecutionItem(ILoRaDeviceCacheStore cacheStore)
public DeduplicationExecutionItem(
ILoRaDeviceCacheStore cacheStore,
IServiceClient serviceClient,
TelemetryConfiguration telemetryConfiguration)
{
this.cacheStore = cacheStore;
this.serviceClient = serviceClient;

var telemetryClient = new TelemetryClient(telemetryConfiguration);
var metricIdentifier = new MetricIdentifier(LoraKeysManagerFacadeConstants.MetricNamespace, ConnectionOwnershipChangeMetricName);
this.connectionOwnershipChangedMetric = telemetryClient.GetMetric(metricIdentifier);
}

public async Task<FunctionBundlerExecutionState> ExecuteAsync(IPipelineExecutionContext context)
{
if (context is null) throw new System.ArgumentNullException(nameof(context));
ArgumentNullException.ThrowIfNull(context);

context.Result.DeduplicationResult = await GetDuplicateMessageResultAsync(context.DevEUI, context.Request.GatewayId, context.Request.ClientFCntUp, context.Request.ClientFCntDown, context.Logger);

Expand Down Expand Up @@ -47,7 +66,6 @@ internal async Task<DuplicateMsgResult> GetDuplicateMessageResultAsync(DevEui de
{
if (await deviceCache.TryToLockAsync())
{
// we are owning the lock now
if (deviceCache.TryGetInfo(out var cachedDeviceState))
{
var updateCacheState = false;
Expand All @@ -69,17 +87,52 @@ internal async Task<DuplicateMsgResult> GetDuplicateMessageResultAsync(DevEui de

if (updateCacheState)
{
var previousGateway = cachedDeviceState.GatewayId;

cachedDeviceState.FCntUp = clientFCntUp;
cachedDeviceState.GatewayId = gatewayId;
_ = deviceCache.StoreInfo(cachedDeviceState);

if (previousGateway != gatewayId)
{
this.connectionOwnershipChangedMetric.TrackValue(1);

var loraC2DMessage = new LoRaCloudToDeviceMessage()
{
DevEUI = devEUI,
Fport = FramePort.AppMin,
MessageId = Guid.NewGuid().ToString()
};

var method = new CloudToDeviceMethod(LoraKeysManagerFacadeConstants.CloudToDeviceCloseConnection, TimeSpan.FromMinutes(1), TimeSpan.FromMinutes(1));
_ = method.SetPayloadJson(JsonConvert.SerializeObject(loraC2DMessage));

try
{
var res = await this.serviceClient.InvokeDeviceMethodAsync(previousGateway, LoraKeysManagerFacadeConstants.NetworkServerModuleId, method);
p-schuler marked this conversation as resolved.
Show resolved Hide resolved
logger?.LogDebug("Connection owner changed and direct method was called on previous gateway '{PreviousConnectionOwner}' to close connection; result is '{Status}'", previousGateway, res?.Status);

if (!HttpUtilities.IsSuccessStatusCode(res.Status))
{
logger?.LogError("Failed to invoke direct method on LNS '{PreviousConnectionOwner}' to close the connection for device '{DevEUI}'; status '{Status}'", previousGateway, devEUI, res?.Status);
}
}
catch (IotHubException ex)
{
logger?.LogError(ex, "Exception when invoking direct method on LNS '{PreviousConnectionOwner}' to close the connection for device '{DevEUI}'", previousGateway, devEUI);

// The exception is not rethrown because closing the connection on the losing gateway
// is performed on best effort basis.
}
}
}
}
else
{
// initialize
isDuplicate = false;
var state = deviceCache.Initialize(clientFCntUp, clientFCntDown);
logger?.LogDebug("initialized state for {id}:{gwid} = {state}", devEUI, gatewayId, state);
logger?.LogDebug("Connection owner for {DevEui} set to {GatewayId}; state {State}", devEUI, gatewayId, state);
spygi marked this conversation as resolved.
Show resolved Hide resolved
}
}
else
Expand Down
5 changes: 5 additions & 0 deletions LoRaEngine/LoraKeysManagerFacade/HttpUtilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,10 @@ public static ApiVersion GetRequestedVersion(this HttpRequest req)

return ApiVersion.Parse(versionText);
}

/// <summary>
/// Checks if the http status code indicates success.
/// </summary>
public static bool IsSuccessStatusCode(int statusCode) => statusCode is >= 200 and <= 299;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<PackageReference Include="Microsoft.Extensions.Http" Version="6.0.0" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="$(MicrosoftNETSdkFunctionsVersion)" />
<PackageReference Include="StackExchange.Redis" Version="$(StackExchangeRedisVersion)" />
<PackageReference Include="Microsoft.ApplicationInsights.AspNetCore" Version="$(ApplicationInsightsVersion)" />
</ItemGroup>

<!-- Add shared files -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ internal static class LoraKeysManagerFacadeConstants
internal const string TwinProperty_NwkSKey = "NwkSKey";
internal const string NetworkServerModuleId = "LoRaWanNetworkSrvModule";
internal const string CloudToDeviceMessageMethodName = "cloudtodevicemessage";
internal const string CloudToDeviceCloseConnection = "closeconnection";
public const string RoundTripDateTimeStringFormat = "o";
internal const string MetricNamespace = "LoRaWAN";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private async Task<IActionResult> SendMessageViaDirectMethodAsync(
_ = method.SetPayloadJson(JsonConvert.SerializeObject(c2dMessage));

var res = await this.serviceClient.InvokeDeviceMethodAsync(preferredGatewayID, LoraKeysManagerFacadeConstants.NetworkServerModuleId, method);
if (IsSuccessStatusCode(res.Status))
if (HttpUtilities.IsSuccessStatusCode(res.Status))
{
this.log.LogInformation("Direct method call to {gatewayID} and {devEUI} succeeded with {statusCode}", preferredGatewayID, devEUI, res.Status);

Expand Down Expand Up @@ -233,10 +233,5 @@ private async Task<IActionResult> SendMessageViaDirectMethodAsync(
};
}
}

/// <summary>
/// Gets if the http status code indicates success.
/// </summary>
private static bool IsSuccessStatusCode(int statusCode) => statusCode is >= 200 and <= 299;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ public sealed class ModuleConnectionHost : IAsyncDisposable
private readonly LoRaDeviceAPIServiceBase loRaDeviceAPIService;
private readonly ILogger<ModuleConnectionHost> logger;
private readonly Counter<int> unhandledExceptionCount;
private readonly Counter<int> forceClosedConnections;
private ILoraModuleClient loRaModuleClient;
private readonly ILoRaModuleClientFactory loRaModuleClientFactory;

public const string ClosedConnectionLog = "Device connection was closed ";

public ModuleConnectionHost(
NetworkServerConfiguration networkServerConfiguration,
IClassCDeviceMessageSender defaultClassCDevicesMessageSender,
Expand All @@ -45,6 +48,7 @@ public ModuleConnectionHost(
this.loRaModuleClientFactory = loRaModuleClientFactory ?? throw new ArgumentNullException(nameof(loRaModuleClientFactory));
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
this.unhandledExceptionCount = (meter ?? throw new ArgumentNullException(nameof(meter))).CreateCounter<int>(MetricRegistry.UnhandledExceptions);
this.forceClosedConnections = meter.CreateCounter<int>(MetricRegistry.ForceClosedClientConnections);
}

public async Task CreateAsync(CancellationToken cancellationToken)
Expand Down Expand Up @@ -97,6 +101,10 @@ internal async Task<MethodResponse> OnDirectMethodCalled(MethodRequest methodReq
{
return await ClearCacheAsync();
}
else if (string.Equals(Constants.CloudToDeviceCloseConnection, methodRequest.Name, StringComparison.OrdinalIgnoreCase))
{
return await CloseConnectionAsync(methodRequest);
}
else if (string.Equals(Constants.CloudToDeviceDecoderElementName, methodRequest.Name, StringComparison.OrdinalIgnoreCase))
{
return await SendCloudToDeviceMessageAsync(methodRequest);
Expand All @@ -113,7 +121,7 @@ internal async Task<MethodResponse> OnDirectMethodCalled(MethodRequest methodReq
}
}

internal async Task<MethodResponse> SendCloudToDeviceMessageAsync(MethodRequest methodRequest)
private async Task<MethodResponse> SendCloudToDeviceMessageAsync(MethodRequest methodRequest)
{
if (!string.IsNullOrEmpty(methodRequest.DataAsJson))
{
Expand Down Expand Up @@ -149,6 +157,45 @@ private async Task<MethodResponse> ClearCacheAsync()
return new MethodResponse((int)HttpStatusCode.OK);
}

private async Task<MethodResponse> CloseConnectionAsync(MethodRequest methodRequest)
{
ReceivedLoRaCloudToDeviceMessage c2d = null;

try
{
c2d = JsonSerializer.Deserialize<ReceivedLoRaCloudToDeviceMessage>(methodRequest.DataAsJson, new JsonSerializerOptions { PropertyNameCaseInsensitive = true });
}
catch (Exception ex) when (ex is ArgumentNullException or JsonException)
spygi marked this conversation as resolved.
Show resolved Hide resolved
{
this.logger.LogError(ex, "Unable to parse Json for direct method '{MethodName}' for device '{DevEui}', message id '{MessageId}'", methodRequest.Name, c2d?.DevEUI, c2d?.MessageId);
return new MethodResponse((int)HttpStatusCode.BadRequest);
}

if (c2d.DevEUI == null)
{
this.logger.LogError("DevEUI missing, cannot identify device to close connection for; message Id '{MessageId}'", c2d.MessageId);
return new MethodResponse((int)HttpStatusCode.BadRequest);
}

using var scope = this.logger.BeginDeviceScope(c2d.DevEUI);

var loRaDevice = await this.loRaDeviceRegistry.GetDeviceByDevEUIAsync(c2d.DevEUI.Value);
if (loRaDevice == null)
{
this.logger.LogError("Could not retrieve LoRa device; message id '{MessageId}'", c2d.MessageId);
return new MethodResponse((int)HttpStatusCode.NotFound);
}

loRaDevice.IsConnectionOwner = false;
spygi marked this conversation as resolved.
Show resolved Hide resolved
using var cts = methodRequest.ResponseTimeout is { } timeout ? new CancellationTokenSource(timeout) : null;
await loRaDevice.CloseConnectionAsync(cts?.Token ?? CancellationToken.None, force: true);

this.logger.LogInformation(ClosedConnectionLog + "from gateway with id '{GatewayId}', message id '{MessageId}'", this.networkServerConfiguration.GatewayID, c2d.MessageId);
this.forceClosedConnections.Add(1);

return new MethodResponse((int)HttpStatusCode.OK);
}

/// <summary>
/// Method to update the desired properties.
/// We only want to update the auth code if the facadeUri was performed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public static class Constants
/// </summary>
public const string CloudToDeviceClearCache = "clearcache";

/// <summary>
/// Property in decoder json response commanding LNS to close the connection.
/// </summary>
public const string CloudToDeviceCloseConnection = "closeconnection";

/// <summary>
/// Minimum value for device connection keep alive timeout (1 minute).
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ internal static class MetricRegistry
public static readonly CustomMetric DeviceLoadRequests = new CustomMetric("DeviceLoadRequests", "Number of device load requests issued against an API service", MetricType.Counter, new[] { GatewayIdTagName });
public static readonly CustomMetric TwinLoadRequests = new CustomMetric("TwinLoadRequests", "Number of device twin load requests issued against RegistryManager and DeviceClient", MetricType.Counter, new[] { GatewayIdTagName });
public static readonly CustomMetric ActiveClientConnections = new CustomMetric("ActiveClientConnections", "Number of active client connections", MetricType.ObservableGauge, new[] { GatewayIdTagName });
public static readonly CustomMetric ForceClosedClientConnections = new CustomMetric("ForceClosedClientConnections", "Number of device client connections requested to be closed by the FunctionBundler", MetricType.Counter, new[] { GatewayIdTagName });

private static readonly ICollection<CustomMetric> Registry = new[]
{
Expand All @@ -52,7 +53,8 @@ internal static class MetricRegistry
DeviceCacheHits,
DeviceLoadRequests,
TwinLoadRequests,
ActiveClientConnections
ActiveClientConnections,
ForceClosedClientConnections
};

public static readonly IDictionary<string, CustomMetric> RegistryLookup =
Expand Down Expand Up @@ -176,16 +178,16 @@ internal static class MetricsExtensions
public static Counter<T> CreateCounter<T>(this Meter meter, CustomMetric customMetric) where T : struct =>
customMetric.Type == MetricType.Counter
? meter.CreateCounter<T>(customMetric.Name, description: customMetric.Description)
: throw new ArgumentException("Custom metric must of type Counter", nameof(customMetric));
: throw new ArgumentException("Custom metric must be of type Counter", nameof(customMetric));

public static Histogram<T> CreateHistogram<T>(this Meter meter, CustomMetric customMetric) where T : struct =>
customMetric.Type == MetricType.Histogram
? meter.CreateHistogram<T>(customMetric.Name, description: customMetric.Description)
: throw new ArgumentException("Custom metric must of type Histogram", nameof(customMetric));
: throw new ArgumentException("Custom metric must be of type Histogram", nameof(customMetric));

public static ObservableGauge<T> CreateObservableGauge<T>(this Meter meter, CustomMetric customMetric, Func<T> observeValue) where T : struct =>
customMetric.Type == MetricType.ObservableGauge
? meter.CreateObservableGauge(customMetric.Name, observeValue, description: customMetric.Description)
: throw new ArgumentException("Custom metric must of type Histogram", nameof(customMetric));
: throw new ArgumentException("Custom metric must be of type Histogram", nameof(customMetric));
}
}
Loading