Skip to content

Commit

Permalink
Support local and elevation in edge API (Azure#2241)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcschier authored Jun 10, 2024
1 parent a49d38f commit 9ddd1ac
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 72 deletions.
3 changes: 3 additions & 0 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ jobs:
pool:
vmImage: 'ubuntu-latest'
displayName: Trigger Pull Request Build on Governed Pipeline
variables:
message: $[ replace(variables['Build.SourceVersionMessage'], ',', '') ]
steps:
- task: TriggerBuild@4
inputs:
Expand All @@ -28,6 +30,7 @@ jobs:
dependentOnFailedBuildCondition: false
checkbuildsoncurrentbranch: false
failTaskIfConditionsAreNotFulfilled: false
buildParameters: "BuildCommitMessage: $(message)"
templateParameters: 'ref: $(Build.SourceBranch)'
- task: PowerShell@2
inputs:
Expand Down
8 changes: 8 additions & 0 deletions src/Azure.IIoT.OpcUa.Publisher.Models/src/ConnectionModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

namespace Azure.IIoT.OpcUa.Publisher.Models
{
using System.Collections.Generic;
using System.Runtime.Serialization;

/// <summary>
Expand Down Expand Up @@ -41,6 +42,13 @@ public sealed record class ConnectionModel
EmitDefaultValue = false)]
public string? Group { get; set; }

/// <summary>
/// Optional list of preferred locales in preference order.
/// </summary>
[DataMember(Name = "locales", Order = 4,
EmitDefaultValue = false)]
public IReadOnlyList<string>? Locales { get; set; }

/// <summary>
/// Connection options to apply to the created
/// connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public sealed record class RequestHeaderModel
public CredentialModel? Elevation { get; set; }

/// <summary>
/// Optional list of locales in preference order.
/// Optional list of preferred locales in preference order.
/// </summary>
[DataMember(Name = "locales", Order = 1,
EmitDefaultValue = false)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static Task<VariantValue> ReadValueAsync<T>(this IOpcUaClientManager<T> c
nodesToRead, ct).ConfigureAwait(false);
return new JsonVariantEncoder(context.Session.MessageContext, serializer)
.Encode(response.Results[0].WrappedValue, out var tmp);
}, ct);
}, ct: ct);
}
}
}
36 changes: 18 additions & 18 deletions src/Azure.IIoT.OpcUa.Publisher/src/Services/NodeServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public Task<ServerCapabilitiesModel> GetServerCapabilitiesAsync(T endpoint,
{
return _client.ExecuteAsync(endpoint, async context =>
await context.Session.GetServerCapabilitiesAsync(GetNamespaceFormat(header),
ct).ConfigureAwait(false), ct: ct);
ct).ConfigureAwait(false), header, ct);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -142,7 +142,7 @@ public async Task<BrowseFirstResponseModel> BrowseFirstAsync(T endpoint,
ContinuationToken = context.TrackedToken,
ErrorInfo = errorInfo ?? nodeError
};
}, ct: ct).ConfigureAwait(false);
}, request.Header, ct).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -187,15 +187,15 @@ public async Task<BrowseNextResponseModel> BrowseNextAsync(T endpoint,
ContinuationToken = context.TrackedToken,
ErrorInfo = results[0].ErrorInfo
};
}, ct: ct).ConfigureAwait(false);
}, request.Header, ct).ConfigureAwait(false);
}

/// <inheritdoc/>
public IAsyncEnumerable<BrowseStreamChunkModel> BrowseAsync(T endpoint,
BrowseStreamRequestModel request, CancellationToken ct)
{
var stream = new BrowseStream(request, this, _activitySource, ct);
return _client.ExecuteAsync(endpoint, stream.Stack, ct);
return _client.ExecuteAsync(endpoint, stream.Stack, request.Header, ct);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -247,7 +247,7 @@ await AddTargetsToBrowseResultAsync(context.Session,
Targets = targets,
ErrorInfo = results[0].ErrorInfo
};
}, ct: ct).ConfigureAwait(false);
}, request.Header, ct).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -402,7 +402,7 @@ await context.Session.CollectTypeHierarchyAsync(request.Header.ToRequestHeader()
Declarations = declarations
}
};
}, ct: ct).ConfigureAwait(false);
}, request.Header, ct).ConfigureAwait(false);

static NodeType GetNodeType(List<NodeId> hierarchy)
{
Expand Down Expand Up @@ -463,7 +463,7 @@ public async Task<QueryCompilationResponseModel> CompileQueryAsync(T endpoint,
EventFilter = eventFilter,
ErrorInfo = context.ErrorInfo
};
}, ct: ct).ConfigureAwait(false);
}, request.Header, ct).ConfigureAwait(false);
}
catch (Exception ex)
{
Expand Down Expand Up @@ -595,7 +595,7 @@ public async Task<MethodMetadataResponseModel> GetMethodMetadataAsync(
}
}
return result;
}, ct).ConfigureAwait(false);
}, request.Header, ct).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -804,7 +804,7 @@ public async Task<MethodCallResponseModel> MethodCallAsync(T endpoint,
Results = arguments,
ErrorInfo = results[0].ErrorInfo
};
}, ct: ct).ConfigureAwait(false);
}, request.Header, ct).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -877,7 +877,7 @@ public async Task<ValueReadResponseModel> ValueReadAsync(T endpoint,
DataType = type == BuiltInType.Null ? null : type.ToString(),
ErrorInfo = values[0].ErrorInfo
};
}, ct: ct).ConfigureAwait(false);
}, request.Header, ct).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -942,7 +942,7 @@ public async Task<ValueWriteResponseModel> ValueWriteAsync(T endpoint,
{
ErrorInfo = values.ErrorInfo ?? values[0].ErrorInfo
};
}, ct: ct).ConfigureAwait(false);
}, request.Header, ct).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -993,7 +993,7 @@ public async Task<ReadResponseModel> ReadAsync(T endpoint,
};
}).ToList()
};
}, ct: ct).ConfigureAwait(false);
}, request.Header, ct).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -1044,7 +1044,7 @@ public async Task<WriteResponseModel> WriteAsync(T endpoint,
};
}).ToList()
};
}, ct: ct).ConfigureAwait(false);
}, request.Header, ct).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand All @@ -1053,7 +1053,7 @@ public Task<HistoryServerCapabilitiesModel> HistoryGetServerCapabilitiesAsync(
{
return _client.ExecuteAsync(endpoint, async context =>
await context.Session.GetHistoryCapabilitiesAsync(GetNamespaceFormat(header),
ct).ConfigureAwait(false), ct: ct);
ct).ConfigureAwait(false), header, ct);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -1197,7 +1197,7 @@ public async Task<HistoryConfigurationResponseModel> HistoryGetConfigurationAsyn
EndOfArchive = endTime
}
};
}, ct: ct).ConfigureAwait(false);
}, request.Header, ct).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -1315,7 +1315,7 @@ public async Task<HistoryReadResponseModel<TResult>> HistoryReadAsync<TInput, TR
History = history,
ErrorInfo = errorInfo
};
}, ct: ct).ConfigureAwait(false);
}, request.Header, ct).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -1371,7 +1371,7 @@ public async Task<HistoryReadNextResponseModel<TResult>> HistoryReadNextAsync<TR
History = history,
ErrorInfo = errorInfo
};
}, ct: ct).ConfigureAwait(false);
}, request.Header, ct).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -1421,7 +1421,7 @@ public async Task<HistoryUpdateResponseModel> HistoryUpdateAsync<TInput>(
Results = inner.Select(r => r.ResultInfo).ToList(),
ErrorInfo = inner.ErrorInfo
};
}, ct: ct).ConfigureAwait(false);
}, request.Header, ct).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
namespace Azure.IIoT.OpcUa.Publisher.Stack
{
using Azure.IIoT.OpcUa.Publisher.Models;
using Azure.IIoT.OpcUa.Encoders;
using Furly.Extensions.Serializers;
using Opc.Ua;
using Opc.Ua.Extensions;
Expand Down Expand Up @@ -35,21 +34,6 @@ public static RequestHeader ToRequestHeader(this RequestHeaderModel? header,
header?.Diagnostics?.TimeStamp, timeoutHint);
}

/// <summary>
/// Convert diagnostics to request header
/// </summary>
/// <param name="context"></param>
/// <param name="level"></param>
/// <param name="timestamp"></param>
/// <param name="timeoutHint"></param>
/// <returns></returns>
public static RequestHeader ToRequestHeader(this OperationContextModel? context,
DiagnosticsLevel? level = null, DateTime? timestamp = null,
uint timeoutHint = 0)
{
return level.ToRequestHeader(context?.AuthorityId, timestamp, timeoutHint);
}

/// <summary>
/// Convert diagnostics to request header
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ public interface IOpcUaClientManager<T>
/// <typeparam name="TResult"></typeparam>
/// <param name="connection"></param>
/// <param name="func"></param>
/// <param name="header"></param>
/// <param name="ct"></param>
/// <returns></returns>
Task<TResult> ExecuteAsync<TResult>(T connection,
Func<ServiceCallContext, Task<TResult>> func,
CancellationToken ct = default);
RequestHeaderModel? header = null, CancellationToken ct = default);

/// <summary>
/// Execute the functions from stack on the provided
Expand All @@ -60,10 +61,11 @@ Task<TResult> ExecuteAsync<TResult>(T connection,
/// <typeparam name="TResult"></typeparam>
/// <param name="connection"></param>
/// <param name="stack"></param>
/// <param name="header"></param>
/// <param name="ct"></param>
/// <returns></returns>
IAsyncEnumerable<TResult> ExecuteAsync<TResult>(T connection,
Stack<Func<ServiceCallContext, ValueTask<IEnumerable<TResult>>>> stack,
CancellationToken ct = default);
RequestHeaderModel? header = null, CancellationToken ct = default);
}
}
22 changes: 13 additions & 9 deletions src/Azure.IIoT.OpcUa.Publisher/src/Stack/Services/OpcUaClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -947,7 +947,7 @@ async ValueTask ApplySubscriptionAsync(IReadOnlyList<IOpcUaSubscription> subscri
HashSet<IOpcUaSubscription> extra, CancellationToken cancellationToken = default)
{
var numberOfSubscriptions = subscriptions.Count + extra.Count;
_logger.LogDebug("{Client}: Applying changes to {Count} subscriptions...",
_logger.LogDebug("{Client}: Applying changes to {Count} subscriptions...",
this, numberOfSubscriptions);
var sw = Stopwatch.StartNew();

Expand Down Expand Up @@ -1160,14 +1160,18 @@ private async ValueTask<bool> TryConnectAsync(CancellationToken ct)
_logger.LogInformation(
"#{Attempt} - {Client}: Creating session {Name} with endpoint {EndpointUrl}...",
++attempt, this, _sessionName, endpointUrl);
// Create the session with english as default and current language
// locale as backup
var preferredLocales = new HashSet<string>
{
"en-US",
CultureInfo.CurrentCulture.Name
}.ToList();

var preferredLocales = _connection.Locales?.ToList() ?? new List<string>();
if (preferredLocales.Count == 0)
{
// Create the session with english as default
preferredLocales.Add("en-US");
if (CultureInfo.CurrentCulture.Name != preferredLocales[0])
{
// and current language locale as backup
preferredLocales.Add(CultureInfo.CurrentCulture.Name);
}
}
var sessionTimeout = SessionTimeout ?? TimeSpan.FromSeconds(30);
var session = await CreateAsync(_configuration,
_reverseConnectManager, endpoint,
Expand Down Expand Up @@ -1444,7 +1448,7 @@ async ValueTask DisposeAsync(OpcUaSession session)
{
await session.CloseAsync(CancellationToken.None).ConfigureAwait(false);

_logger.LogDebug("{Client}: Successfully closed session {Session}.",
_logger.LogDebug("{Client}: Successfully closed session {Session}.",
this, session);
}
catch (Exception ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,11 @@ public async Task<X509CertificateChainModel> GetEndpointCertificateAsync(

/// <inheritdoc/>
public async Task<T> ExecuteAsync<T>(ConnectionModel connection,
Func<ServiceCallContext, Task<T>> func, CancellationToken ct)
Func<ServiceCallContext, Task<T>> func, RequestHeaderModel? header,
CancellationToken ct)
{
if (connection.Endpoint == null)
{
throw new ArgumentNullException(nameof(connection));
}
if (string.IsNullOrEmpty(connection.Endpoint.Url))
connection = UpdateConnectionFromHeader(connection, header);
if (string.IsNullOrEmpty(connection.Endpoint?.Url))
{
throw new ArgumentException("Missing endpoint url", nameof(connection));
}
Expand All @@ -237,13 +235,10 @@ public async Task<T> ExecuteAsync<T>(ConnectionModel connection,
/// <inheritdoc/>
public IAsyncEnumerable<T> ExecuteAsync<T>(ConnectionModel connection,
Stack<Func<ServiceCallContext, ValueTask<IEnumerable<T>>>> stack,
CancellationToken ct)
RequestHeaderModel? header, CancellationToken ct)
{
if (connection.Endpoint == null)
{
throw new ArgumentNullException(nameof(connection));
}
if (string.IsNullOrEmpty(connection.Endpoint.Url))
connection = UpdateConnectionFromHeader(connection, header);
if (string.IsNullOrEmpty(connection.Endpoint?.Url))
{
throw new ArgumentException("Missing endpoint url", nameof(connection));
}
Expand Down Expand Up @@ -391,6 +386,36 @@ private async Task DiscoverAsync(Uri discoveryUrl, StringCollection? localeIds,
}
}

/// <summary>
/// Update connection from header
/// </summary>
/// <param name="connection"></param>
/// <param name="header"></param>
/// <returns></returns>
private static ConnectionModel UpdateConnectionFromHeader(ConnectionModel connection,
RequestHeaderModel? header)
{
if (header == null)
{
return connection;
}
if (header.Elevation != null)
{
connection = connection with
{
User = header.Elevation,
};
}
if (header.Locales != null)
{
connection = connection with
{
Locales = header.Locales
};
}
return connection;
}

/// <summary>
/// Create discovery url from string
/// </summary>
Expand Down
Loading

0 comments on commit 9ddd1ac

Please sign in to comment.