Skip to content

Commit

Permalink
Add experimental transports (Azure#2238)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcschier authored Jun 6, 2024
1 parent b94562d commit 7cbcae8
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ public enum WriterGroupTransport
[EnumMember(Value = "Mqtt")]
Mqtt,

/// <summary>
/// Azure Event Hub
/// </summary>
[EnumMember(Value = "EventHub")]
EventHub,

/// <summary>
/// Dapr
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
<PackageReference Include="Furly.Extensions.Mqtt" Version="1.0.48" />
<PackageReference Include="Furly.Extensions.Dapr" Version="1.0.48" />
<PackageReference Include="Furly.Extensions.MessagePack" Version="1.0.48" />
<PackageReference Include="Furly.Azure.EventHubs" Version="1.0.48" />
<PackageReference Include="Furly.Azure.IoT" Version="1.0.48" />
<PackageReference Include="Furly.Tunnel" Version="1.0.48" />
<PackageReference Include="Mono.Options" Version="6.12.0.148" />
<PackageReference Include="OpenTelemetry.Extensions.Hosting" Version="1.8.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ public CommandLine(string[] args, CommandLineLogger? logger = null)
{ $"ht|ih=|iothubprotocol=|{Configuration.IoTEdge.HubTransport}=",
$"Protocol to use for communication with EdgeHub.\nAllowed values:\n `{string.Join("`\n `", Enum.GetNames(typeof(TransportOption)))}`\nDefault: `{nameof(TransportOption.Mqtt)}` if device or edge hub connection string is provided, ignored otherwise.\n",
(TransportOption p) => this[Configuration.IoTEdge.HubTransport] = p.ToString() },
{ $"eh=|eventhubnamespaceconnectionstring=|{Configuration.EventHubs.EventHubNamespaceConnectionString}=",
"The connection string of an existing event hub namespace to use for the Azure EventHub transport.\nDefault: `not set`.\n",
eh => this[Configuration.EventHubs.EventHubNamespaceConnectionString] = eh },
{ $"sg=|schemagroup=|{Configuration.EventHubs.SchemaGroupNameKey}=",
"The schema group in an event hub namespace to publish message schemas to.\nDefault: `not set`.\n",
sg => this[Configuration.EventHubs.SchemaGroupNameKey] = sg },
{ $"d|dcs=|daprconnectionstring=|{Configuration.Dapr.DaprConnectionStringKey}=",
$"Connect the OPC Publisher to a dapr pub sub component using a connection string.\nThe connection string specifies the PubSub component to use and allows you to configure the side car connection if needed.\nUse the format 'PubSubComponent=<PubSubComponent>[;GrpcPort=<GrpcPort>;HttpPort=<HttpPort>[;Scheme=<'https'|'http'>][;Host=<IPorDnsName>]][;ApiKey=<ApiKey>]'.\nTo publish through dapr by default specify `-t={nameof(WriterGroupTransport.Dapr)}`.\nDefault: `not set`.\n",
dcs => this[Configuration.Dapr.DaprConnectionStringKey] = dcs },
Expand Down
61 changes: 61 additions & 0 deletions src/Azure.IIoT.OpcUa.Publisher.Module/src/Runtime/Configuration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace Azure.IIoT.OpcUa.Publisher.Module.Runtime
{
using Azure.IIoT.OpcUa.Publisher.Module.Controllers;
using Autofac;
using Furly.Azure.EventHubs;
using Furly.Azure.IoT.Edge;
using Furly.Extensions.AspNetCore.OpenApi;
using Furly.Extensions.Configuration;
Expand Down Expand Up @@ -127,6 +128,25 @@ public static void AddIoTEdgeServices(this ContainerBuilder builder,
}
}

/// <summary>
/// Add Event Hubs client
/// </summary>
/// <param name="builder"></param>
/// <param name="configuration"></param>
public static void AddEventHubsClient(this ContainerBuilder builder,
IConfiguration configuration)
{
// Validate edge configuration
var eventHubsOptions = new EventHubsClientOptions();
new EventHubs(configuration).Configure(eventHubsOptions);
if (eventHubsOptions.ConnectionString != null)
{
builder.AddHubEventClient();
builder.RegisterType<EventHubs>()
.AsImplementedInterfaces();
}
}

/// <summary>
/// Add file system client
/// </summary>
Expand Down Expand Up @@ -985,6 +1005,47 @@ public IoTEdge(IConfiguration configuration)
}
}

/// <summary>
/// Configure event hub client
/// </summary>
internal sealed class EventHubs : ConfigureOptionBase<EventHubsClientOptions>
{
/// <summary>
/// Configuration
/// </summary>
public const string SchemaGroupNameKey = "SchemaGroupName";
public const string EventHubNamespaceConnectionString = "EventHubNamespaceConnectionString";

/// <inheritdoc/>
public override void Configure(string? name, EventHubsClientOptions options)
{
if (string.IsNullOrEmpty(options.ConnectionString))
{
options.ConnectionString = GetStringOrDefault(EventHubNamespaceConnectionString);
}

var schemaGroupName = GetStringOrDefault(SchemaGroupNameKey);

if (!string.IsNullOrEmpty(schemaGroupName))
{
options.SchemaRegistry = new SchemaRegistryOptions
{
FullyQualifiedNamespace = string.Empty, // TODO: Remove
SchemaGroupName = schemaGroupName
};
}
}

/// <summary>
/// Transport configuration
/// </summary>
/// <param name="configuration"></param>
public EventHubs(IConfiguration configuration)
: base(configuration)
{
}
}

/// <summary>
/// Parse connection string as dictionary
/// </summary>
Expand Down
1 change: 1 addition & 0 deletions src/Azure.IIoT.OpcUa.Publisher.Module/src/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public virtual void ConfigureContainer(ContainerBuilder builder)
builder.AddFileSystemEventClient(Configuration);
builder.AddHttpEventClient(Configuration);
builder.AddDaprPubSubClient(Configuration);
builder.AddEventHubsClient(Configuration);
builder.AddMqttClient(Configuration);
builder.AddIoTEdgeServices(Configuration);

Expand Down

0 comments on commit 7cbcae8

Please sign in to comment.