Skip to content

Commit

Permalink
Send uncertain or bad status code heartbeats when session is in error…
Browse files Browse the repository at this point in the history
… state or disconnected (Azure#2242)

The `NotifySessionConnectionState` method has been added to the `IOpcUaSubscription` interface and `OpcUaSubscription.cs` to notify when the session is disconnected. The heartbeat value status code is updated to a connectivity error state during disconnection.  It is restored upon reconnect or the next good value is received.

This commit introduces a command line option to disable actively handling reconnecting a session when publishing errors occur due to connectivity issues. 

Updated the `buildParameters` in the `azure-pipelines.yml` file to include the repository name, source branch, and commit message. This change enhances the detail of information provided in the build parameters.
  • Loading branch information
marcschier authored Jun 10, 2024
1 parent 9ddd1ac commit f50fe0e
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 13 deletions.
2 changes: 1 addition & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
dependentOnFailedBuildCondition: false
checkbuildsoncurrentbranch: false
failTaskIfConditionsAreNotFulfilled: false
buildParameters: "BuildCommitMessage: $(message)"
buildParameters: 'BuildCommitMessage: "$(Build.Repository.Name) $(Build.SourceBranchName) $(message)"'
templateParameters: 'ref: $(Build.SourceBranch)'
- task: PowerShell@2
inputs:
Expand Down
9 changes: 9 additions & 0 deletions docs/opc-publisher/commandline.md
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,15 @@ OPC UA Client configuration
want to ensure the complex types are never
loaded for an endpoint.
Default: `false`.
--peh, --activepublisherrorhandling, --ActivePublishErrorHandling[=VALUE]
Actively handle reconnecting a session when
publishing errors occur due to issues in the
underlying connectivity rather than letting the
stack and keep alive handling manage
reconnecting.
Note that the default will be `false` in future
releases.
Default: `True`.
--otl, --opctokenlifetime, --SecurityTokenLifetime=VALUE
OPC UA Stack Transport Secure Channel - Security
token lifetime in milliseconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,9 @@ public CommandLine(string[] args, CommandLineLogger? logger = null)
{ $"dcp|disablecomplextypepreloading:|{OpcUaClientConfig.DisableComplexTypePreloadingKey}:",
"Complex types (structures, enumerations) a server exposes are preloaded from the server after the session is connected. In some cases this can cause problems either on the client or server itself. Use this setting to disable pre-loading support.\nNote that since the complex type system is used for meta data messages it will still be loaded at the time the subscription is created, therefore also disable meta data support if you want to ensure the complex types are never loaded for an endpoint.\nDefault: `false`.\n",
(bool? b) => this[OpcUaClientConfig.DisableComplexTypePreloadingKey] = b?.ToString() ?? "True" },
{ $"peh|activepublisherrorhandling:|{OpcUaClientConfig.ActivePublishErrorHandlingKey}:",
$"Actively handle reconnecting a session when publishing errors occur due to issues in the underlying connectivity rather than letting the stack and keep alive handling manage reconnecting.\nNote that the default will be `false` in future releases.\nDefault: `{OpcUaClientConfig.ActivePublishErrorHandlingDefault}`.\n",
(bool? b) => this[OpcUaClientConfig.ActivePublishErrorHandlingKey] = b?.ToString() ?? "True" },

{ $"otl|opctokenlifetime=|{OpcUaClientConfig.SecurityTokenLifetimeKey}=",
"OPC UA Stack Transport Secure Channel - Security token lifetime in milliseconds.\nDefault: `3600000` (1h).\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ ValueTask SyncWithSessionAsync(ISession session,
bool TryGetCurrentPosition(out uint subscriptionId,
out uint sequenceNumber);

/// <summary>
/// Notifiy session disconnected/reconnecting
/// </summary>
/// <returns></returns>
void NotifySessionConnectionState(bool disconnected);

/// <summary>
/// Notifies the subscription that should remove
/// itself from the session. If the session is null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public sealed class OpcUaClientConfig : PostConfigureOptionBase<OpcUaClientOptio
public const string ReverseConnectPortKey = "ReverseConnectPort";
public const string DisableComplexTypePreloadingKey = "DisableComplexTypePreloading";
public const string PublishRequestsPerSubscriptionPercentKey = "PublishRequestsPerSubscriptionPercent";
public const string ActivePublishErrorHandlingKey = "ActivePublishErrorHandling";
public const string MinPublishRequestsKey = "MinPublishRequests";
public const string MaxNodesPerBrowseOverrideKey = "MaxNodesPerBrowseOverride";
public const string MaxNodesPerReadOverrideKey = "MaxNodesPerReadOverride";
Expand Down Expand Up @@ -107,6 +108,7 @@ public sealed class OpcUaClientConfig : PostConfigureOptionBase<OpcUaClientOptio
public const bool RejectSha1SignedCertificatesDefault = false;
public const bool AddAppCertToTrustedStoreDefault = true;
public const bool RejectUnknownRevocationStatusDefault = true;
public const bool ActivePublishErrorHandlingDefault = true;
public const int MinPublishRequestsDefault = 3;
public const int PublishRequestsPerSubscriptionPercentDefault = 100;
#pragma warning restore CS1591 // Missing XML comment for publicly visible type or member
Expand Down Expand Up @@ -249,6 +251,9 @@ public override void PostConfigure(string? name, OpcUaClientOptions options)
PublishRequestsPerSubscriptionPercentKey,
PublishRequestsPerSubscriptionPercentDefault);

options.ActivePublishErrorHandling ??= GetBoolOrDefault(
ActivePublishErrorHandlingKey, ActivePublishErrorHandlingDefault);

options.MaxNodesPerReadOverride ??= GetIntOrNull(MaxNodesPerReadOverrideKey);

options.MaxNodesPerBrowseOverride ??= GetIntOrNull(MaxNodesPerBrowseOverrideKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,5 +136,12 @@ public sealed class OpcUaClientOptions
/// Limit max nodes to browse in a batch operation
/// </summary>
public int? MaxNodesPerBrowseOverride { get; set; }

/// <summary>
/// Manage the connectivity of the session and state
/// actively when publishing errors occur that are
/// related to session connectivity.
/// </summary>
public bool? ActivePublishErrorHandling { get; set; }
}
}
31 changes: 19 additions & 12 deletions src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ internal sealed partial class OpcUaClient : DefaultSessionFactory, IOpcUaClient,
/// </summary>
public bool DisableComplexTypePreloading { get; set; }

/// <summary>
/// Do active error handling on the publish path
/// </summary>
public bool ActivePublishErrorHandling { get; set; }

/// <summary>
/// Operation limits to use in the sessions
/// </summary>
Expand Down Expand Up @@ -688,6 +693,7 @@ private async Task ManageSessionStateMachineAsync(CancellationToken ct)
// If currently reconnecting, dispose the reconnect handler and stop timer
_reconnectHandler.CancelReconnect();
NotifyConnectivityStateChange(EndpointConnectivityState.Disconnected);
currentSubscriptions.ForEach(h => h.NotifySessionConnectionState(true));

// Clean up
await CloseSessionAsync().ConfigureAwait(false);
Expand Down Expand Up @@ -739,6 +745,7 @@ await ApplySubscriptionAsync(currentSubscriptions, queuedSubscriptions,
ct).ConfigureAwait(false);

currentSessionState = SessionState.Connected;
currentSubscriptions.ForEach(h => h.NotifySessionConnectionState(false));
break;
case SessionState.Disconnected:
case SessionState.Connected:
Expand Down Expand Up @@ -804,6 +811,8 @@ await ApplySubscriptionAsync(new[] { item }, queuedSubscriptions,
_session = null;
NotifyConnectivityStateChange(EndpointConnectivityState.Connecting);
currentSessionState = SessionState.Reconnecting;
_reconnectingSession?.SubscriptionHandles
.ForEach(h => h.NotifySessionConnectionState(true));
break;
case SessionState.Connecting:
case SessionState.Disconnected:
Expand Down Expand Up @@ -868,6 +877,7 @@ await ApplySubscriptionAsync(currentSubscriptions, queuedSubscriptions,

_reconnectRequired = 0;
currentSessionState = SessionState.Connected;
currentSubscriptions.ForEach(h => h.NotifySessionConnectionState(false));
break;

case SessionState.Connected:
Expand Down Expand Up @@ -909,6 +919,8 @@ await ApplySubscriptionAsync(currentSubscriptions, queuedSubscriptions,
}

NotifyConnectivityStateChange(EndpointConnectivityState.Disconnected);
_session?.SubscriptionHandles
.ForEach(h => h.NotifySessionConnectionState(true));

// Clean up
await CloseSessionAsync().ConfigureAwait(false);
Expand Down Expand Up @@ -1234,10 +1246,16 @@ internal void Session_HandlePublishError(ISession session, PublishErrorEventArgs
this, limit);
return;
default:
_logger.LogInformation("{Client}: Publish error: {Error}...", this, e.Status);
_logger.LogInformation("{Client}: Publish error: {Error} (Actively handled: {Active})...",
this, e.Status, ActivePublishErrorHandling);
break;
}

if (!ActivePublishErrorHandling)
{
return;
}

switch (e.Status.Code)
{
case StatusCodes.BadSessionIdInvalid:
Expand All @@ -1258,20 +1276,9 @@ internal void Session_HandlePublishError(ISession session, PublishErrorEventArgs
TriggerReconnect(e.Status);
}
return;
case StatusCodes.BadTooManyOperations:
SetCode(e.Status, StatusCodes.BadServerHalted);
break;
}
// Reset timeout counter - we only care about subsequent timeouts
_publishTimeoutCounter = 0;

// Reach into the private field and update it.
static void SetCode(ServiceResult status, uint fixup)
{
typeof(ServiceResult).GetField("m_code",
System.Reflection.BindingFlags.NonPublic |
System.Reflection.BindingFlags.Instance)?.SetValue(status, fixup);
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ private OpcUaClient GetOrAddClient(ConnectionModel connection)
TimeSpan.FromMilliseconds(_options.Value.Quotas.OperationTimeout),
DisableComplexTypePreloading = _options.Value.DisableComplexTypePreloading ?? false,
ActivePublishErrorHandling = _options.Value.ActivePublishErrorHandling ?? false,
MinReconnectDelay = _options.Value.MinReconnectDelayDuration,
CreateSessionTimeout = _options.Value.CreateSessionTimeoutDuration,
KeepAliveInterval = _options.Value.KeepAliveIntervalDuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,39 @@ public override bool TryGetMonitoredItemNotifications(uint sequenceNumber, DateT
return base.TryGetMonitoredItemNotifications(sequenceNumber, timestamp, evt, notifications);
}

/// <inheritdoc/>
public override void NotifySessionConnectionState(bool disconnected)
{
//
// We change the reference here - we cloned the value and if it has been
// updated while we are doing this, a new value will be in in place and we
// should be connected again or we would not have received it.
//
var lastValue = LastReceivedValue as MonitoredItemNotification;
if (lastValue?.Value != null)
{
if (disconnected)
{
_lastStatusCode = lastValue.Value.StatusCode;
if (IsGoodDataValue(lastValue.Value))
{
lastValue.Value.StatusCode =
StatusCodes.UncertainNoCommunicationLastUsableValue;
}
else
{
lastValue.Value.StatusCode =
StatusCodes.BadNoCommunication;
}
}
else if (_lastStatusCode.HasValue)
{
lastValue.Value.StatusCode = _lastStatusCode.Value;
_lastStatusCode = null; // This is safe as we are called from the client thread
}
}
}

/// <summary>
/// TODO: What is a Good value? Right now we say that it must either be full good or
/// have a value and not a bad status code (to cover Good_, and Uncertain_ as well)
Expand Down Expand Up @@ -331,6 +364,7 @@ private void SendHeartbeatNotifications(object? sender, System.Timers.ElapsedEve
private TimeSpan _heartbeatInterval;
private Callback? _callback;
private DateTime? _lastValueReceived;
private StatusCode? _lastStatusCode;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ public abstract ValueTask GetMetaDataAsync(IOpcUaSession session,
ComplexTypeSystem? typeSystem, List<PublishedFieldMetaDataModel> fields,
NodeIdDictionary<object> dataTypes, CancellationToken ct);

/// <summary>
/// Called when the underlying session is disconnected
/// </summary>
public virtual void NotifySessionConnectionState(bool disconnected)
{
}

/// <summary>
/// Dispose
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,15 @@ public bool TryGetCurrentPosition(out uint subscriptionId, out uint sequenceNumb
return _useDeferredAcknoledge;
}

/// <inheritdoc/>
public void NotifySessionConnectionState(bool disconnected)
{
foreach (var item in CurrentlyMonitored)
{
item.NotifySessionConnectionState(disconnected);
}
}

/// <inheritdoc/>
public IOpcUaSubscriptionNotification? CreateKeepAlive()
{
Expand Down

0 comments on commit f50fe0e

Please sign in to comment.