diff --git a/src/Azure.IIoT.OpcUa.Publisher.Models/src/WriterGroupTransport.cs b/src/Azure.IIoT.OpcUa.Publisher.Models/src/WriterGroupTransport.cs index 81b7f00c0f..3b7921ff4d 100644 --- a/src/Azure.IIoT.OpcUa.Publisher.Models/src/WriterGroupTransport.cs +++ b/src/Azure.IIoT.OpcUa.Publisher.Models/src/WriterGroupTransport.cs @@ -25,6 +25,12 @@ public enum WriterGroupTransport [EnumMember(Value = "Mqtt")] Mqtt, + /// + /// Azure Event Hub + /// + [EnumMember(Value = "EventHub")] + EventHub, + /// /// Dapr /// diff --git a/src/Azure.IIoT.OpcUa.Publisher.Module/src/Azure.IIoT.OpcUa.Publisher.Module.csproj b/src/Azure.IIoT.OpcUa.Publisher.Module/src/Azure.IIoT.OpcUa.Publisher.Module.csproj index 4fa0b24ae9..a65299c38f 100644 --- a/src/Azure.IIoT.OpcUa.Publisher.Module/src/Azure.IIoT.OpcUa.Publisher.Module.csproj +++ b/src/Azure.IIoT.OpcUa.Publisher.Module/src/Azure.IIoT.OpcUa.Publisher.Module.csproj @@ -37,6 +37,8 @@ + + diff --git a/src/Azure.IIoT.OpcUa.Publisher.Module/src/Runtime/CommandLine.cs b/src/Azure.IIoT.OpcUa.Publisher.Module/src/Runtime/CommandLine.cs index 07d5cf67a1..dc2a161828 100644 --- a/src/Azure.IIoT.OpcUa.Publisher.Module/src/Runtime/CommandLine.cs +++ b/src/Azure.IIoT.OpcUa.Publisher.Module/src/Runtime/CommandLine.cs @@ -198,6 +198,12 @@ public CommandLine(string[] args, CommandLineLogger? logger = null) { $"ht|ih=|iothubprotocol=|{Configuration.IoTEdge.HubTransport}=", $"Protocol to use for communication with EdgeHub.\nAllowed values:\n `{string.Join("`\n `", Enum.GetNames(typeof(TransportOption)))}`\nDefault: `{nameof(TransportOption.Mqtt)}` if device or edge hub connection string is provided, ignored otherwise.\n", (TransportOption p) => this[Configuration.IoTEdge.HubTransport] = p.ToString() }, + { $"eh=|eventhubnamespaceconnectionstring=|{Configuration.EventHubs.EventHubNamespaceConnectionString}=", + "The connection string of an existing event hub namespace to use for the Azure EventHub transport.\nDefault: `not set`.\n", + eh => this[Configuration.EventHubs.EventHubNamespaceConnectionString] = eh }, + { $"sg=|schemagroup=|{Configuration.EventHubs.SchemaGroupNameKey}=", + "The schema group in an event hub namespace to publish message schemas to.\nDefault: `not set`.\n", + sg => this[Configuration.EventHubs.SchemaGroupNameKey] = sg }, { $"d|dcs=|daprconnectionstring=|{Configuration.Dapr.DaprConnectionStringKey}=", $"Connect the OPC Publisher to a dapr pub sub component using a connection string.\nThe connection string specifies the PubSub component to use and allows you to configure the side car connection if needed.\nUse the format 'PubSubComponent=[;GrpcPort=;HttpPort=[;Scheme=<'https'|'http'>][;Host=]][;ApiKey=]'.\nTo publish through dapr by default specify `-t={nameof(WriterGroupTransport.Dapr)}`.\nDefault: `not set`.\n", dcs => this[Configuration.Dapr.DaprConnectionStringKey] = dcs }, diff --git a/src/Azure.IIoT.OpcUa.Publisher.Module/src/Runtime/Configuration.cs b/src/Azure.IIoT.OpcUa.Publisher.Module/src/Runtime/Configuration.cs index ba3d69fc1d..ce5241f20a 100644 --- a/src/Azure.IIoT.OpcUa.Publisher.Module/src/Runtime/Configuration.cs +++ b/src/Azure.IIoT.OpcUa.Publisher.Module/src/Runtime/Configuration.cs @@ -7,6 +7,7 @@ namespace Azure.IIoT.OpcUa.Publisher.Module.Runtime { using Azure.IIoT.OpcUa.Publisher.Module.Controllers; using Autofac; + using Furly.Azure.EventHubs; using Furly.Azure.IoT.Edge; using Furly.Extensions.AspNetCore.OpenApi; using Furly.Extensions.Configuration; @@ -127,6 +128,25 @@ public static void AddIoTEdgeServices(this ContainerBuilder builder, } } + /// + /// Add Event Hubs client + /// + /// + /// + public static void AddEventHubsClient(this ContainerBuilder builder, + IConfiguration configuration) + { + // Validate edge configuration + var eventHubsOptions = new EventHubsClientOptions(); + new EventHubs(configuration).Configure(eventHubsOptions); + if (eventHubsOptions.ConnectionString != null) + { + builder.AddHubEventClient(); + builder.RegisterType() + .AsImplementedInterfaces(); + } + } + /// /// Add file system client /// @@ -985,6 +1005,47 @@ public IoTEdge(IConfiguration configuration) } } + /// + /// Configure event hub client + /// + internal sealed class EventHubs : ConfigureOptionBase + { + /// + /// Configuration + /// + public const string SchemaGroupNameKey = "SchemaGroupName"; + public const string EventHubNamespaceConnectionString = "EventHubNamespaceConnectionString"; + + /// + public override void Configure(string? name, EventHubsClientOptions options) + { + if (string.IsNullOrEmpty(options.ConnectionString)) + { + options.ConnectionString = GetStringOrDefault(EventHubNamespaceConnectionString); + } + + var schemaGroupName = GetStringOrDefault(SchemaGroupNameKey); + + if (!string.IsNullOrEmpty(schemaGroupName)) + { + options.SchemaRegistry = new SchemaRegistryOptions + { + FullyQualifiedNamespace = string.Empty, // TODO: Remove + SchemaGroupName = schemaGroupName + }; + } + } + + /// + /// Transport configuration + /// + /// + public EventHubs(IConfiguration configuration) + : base(configuration) + { + } + } + /// /// Parse connection string as dictionary /// diff --git a/src/Azure.IIoT.OpcUa.Publisher.Module/src/Startup.cs b/src/Azure.IIoT.OpcUa.Publisher.Module/src/Startup.cs index e8c2a3f10a..e44127df15 100644 --- a/src/Azure.IIoT.OpcUa.Publisher.Module/src/Startup.cs +++ b/src/Azure.IIoT.OpcUa.Publisher.Module/src/Startup.cs @@ -161,6 +161,7 @@ public virtual void ConfigureContainer(ContainerBuilder builder) builder.AddFileSystemEventClient(Configuration); builder.AddHttpEventClient(Configuration); builder.AddDaprPubSubClient(Configuration); + builder.AddEventHubsClient(Configuration); builder.AddMqttClient(Configuration); builder.AddIoTEdgeServices(Configuration);