Skip to content

Commit

Permalink
Made naming and parsing of commands consistent and updated NuGets.
Browse files Browse the repository at this point in the history
  • Loading branch information
barnstee committed May 21, 2024
1 parent 21ff92e commit 03b4f64
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 94 deletions.
161 changes: 79 additions & 82 deletions KafkaClient.cs
Original file line number Diff line number Diff line change
@@ -1,84 +1,81 @@

namespace Opc.Ua.Cloud.Commander
{
using Confluent.Kafka;

namespace Opc.Ua.Cloud.Commander
{
using Confluent.Kafka;
using Newtonsoft.Json;
using Org.BouncyCastle.Asn1.Ocsp;
using Serilog;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Text;
using System.Threading.Tasks;

public class KafkaClient
{
private ApplicationConfiguration _appConfig = null;

private IProducer<Null, string> _producer = null;
private IConsumer<Ignore, byte[]> _consumer = null;

public KafkaClient(ApplicationConfiguration appConfig)
{
_appConfig = appConfig;
}

public void Connect()
{
try
{
// create Kafka client
var config = new ProducerConfig {
BootstrapServers = Environment.GetEnvironmentVariable("BROKERNAME") + ":9093",
MessageTimeoutMs = 10000,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = Environment.GetEnvironmentVariable("USERNAME"),
SaslPassword = Environment.GetEnvironmentVariable("PASSWORD")
};

_producer = new ProducerBuilder<Null, string>(config).Build();

var conf = new ConsumerConfig
{
GroupId = Environment.GetEnvironmentVariable("CLIENTNAME"),
BootstrapServers = Environment.GetEnvironmentVariable("BROKERNAME") + ":9093",
AutoOffsetReset = AutoOffsetReset.Earliest,
SecurityProtocol= SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = Environment.GetEnvironmentVariable("USERNAME"),
SaslPassword= Environment.GetEnvironmentVariable("PASSWORD")
};

_consumer = new ConsumerBuilder<Ignore, byte[]>(conf).Build();

_consumer.Subscribe(Environment.GetEnvironmentVariable("TOPIC"));

_ = Task.Run(() => HandleCommand());

Log.Logger.Information("Connected to Kafka broker.");

}
catch (Exception ex)
{
Log.Logger.Error("Failed to connect to Kafka broker: " + ex.Message);
}
}

public void Publish(string payload)
{
public class KafkaClient
{
private ApplicationConfiguration _appConfig = null;

private IProducer<Null, string> _producer = null;
private IConsumer<Ignore, byte[]> _consumer = null;

public KafkaClient(ApplicationConfiguration appConfig)
{
_appConfig = appConfig;
}

public void Connect()
{
try
{
// create Kafka client
var config = new ProducerConfig {
BootstrapServers = Environment.GetEnvironmentVariable("BROKERNAME") + ":9093",
MessageTimeoutMs = 10000,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = Environment.GetEnvironmentVariable("USERNAME"),
SaslPassword = Environment.GetEnvironmentVariable("PASSWORD")
};

_producer = new ProducerBuilder<Null, string>(config).Build();

var conf = new ConsumerConfig
{
GroupId = Environment.GetEnvironmentVariable("CLIENTNAME"),
BootstrapServers = Environment.GetEnvironmentVariable("BROKERNAME") + ":9093",
AutoOffsetReset = AutoOffsetReset.Earliest,
SecurityProtocol= SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = Environment.GetEnvironmentVariable("USERNAME"),
SaslPassword= Environment.GetEnvironmentVariable("PASSWORD")
};

_consumer = new ConsumerBuilder<Ignore, byte[]>(conf).Build();

_consumer.Subscribe(Environment.GetEnvironmentVariable("TOPIC"));

_ = Task.Run(() => HandleCommand());

Log.Logger.Information("Connected to Kafka broker.");

}
catch (Exception ex)
{
Log.Logger.Error("Failed to connect to Kafka broker: " + ex.Message);
}
}

public void Publish(string payload)
{
Message<Null, string> message = new()
{
Headers = new Headers() { { "Content-Type", Encoding.UTF8.GetBytes("application/json") } },
Value = payload
};

_producer.ProduceAsync(Environment.GetEnvironmentVariable("RESPONSE_TOPIC"), message).GetAwaiter().GetResult();
}

// handles all incoming commands form the cloud
private void HandleCommand()
{
{
Headers = new Headers() { { "Content-Type", Encoding.UTF8.GetBytes("application/json") } },
Value = payload
};

_producer.ProduceAsync(Environment.GetEnvironmentVariable("RESPONSE_TOPIC"), message).GetAwaiter().GetResult();
}

// handles all incoming commands form the cloud
private void HandleCommand()
{
while (true)
{
ResponseModel response = new()
Expand Down Expand Up @@ -106,25 +103,25 @@ private void HandleCommand()
response.CorrelationId = request.CorrelationId;

// route this to the right handler
if (request.Command == "methodcall")
if (request.Command == "MethodCall")
{
new UAClient().ExecuteUACommand(_appConfig, requestPayload);
Log.Logger.Information($"Call succeeded, sending response to broker...");
response.Success = true;
}
else if (request.Command == "read")
else if (request.Command == "Read")
{
response.Status = new UAClient().ReadUAVariable(_appConfig, requestPayload);
Log.Logger.Information($"Read succeeded, sending response to broker...");
response.Success = true;
}
else if (request.Command == "historyread")
else if (request.Command == "HistoricalRead")
{
response.Status = new UAClient().ReadUAHistory(_appConfig, requestPayload);
Log.Logger.Information($"History read succeeded, sending response to broker...");
response.Success = true;
}
else if (request.Command == "write")
else if (request.Command == "Write")
{
new UAClient().WriteUAVariable(_appConfig, requestPayload);
Log.Logger.Information($"Write succeeded, sending response to broker...");
Expand All @@ -150,7 +147,7 @@ private void HandleCommand()
// send error to Kafka broker
Publish(JsonConvert.SerializeObject(response));
}
}
}
}
}
}
}
}
8 changes: 4 additions & 4 deletions MQTTClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -260,22 +260,22 @@ private async Task HandleMessageAsync(MqttApplicationMessageReceivedEventArgs ar
response.CorrelationId = request.CorrelationId;

// route this to the right handler
if (args.ApplicationMessage.Topic.StartsWith(requestTopic.TrimEnd('#') + "MethodCall"))
if (request.Command == "MethodCall")
{
new UAClient().ExecuteUACommand(_uAApplication, requestPayload);
response.Success = true;
}
else if (args.ApplicationMessage.Topic.StartsWith(requestTopic.TrimEnd('#') + "Read"))
else if (request.Command == "Read")
{
response.Status = new UAClient().ReadUAVariable(_uAApplication, requestPayload);
response.Success = true;
}
else if (args.ApplicationMessage.Topic.StartsWith(requestTopic.TrimEnd('#') + "HistoricalRead"))
else if (request.Command == "HistoricalRead")
{
response.Status = new UAClient().ReadUAHistory(_uAApplication, requestPayload);
response.Success = true;
}
else if (args.ApplicationMessage.Topic.StartsWith(requestTopic.TrimEnd('#') + "Write"))
else if (request.Command == "Write")
{
new UAClient().WriteUAVariable(_uAApplication, requestPayload);
response.Success = true;
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Reads a UA Node on an OPC UA server that must be in the UA Cloud Commander's net

```json
{
"Command": "read",
"Command": "Read",
"CorrelationId": "D892A987-56FB-4724-AF14-5EC6A7EBDD07", // a GUID
"TimeStamp": "2022-11-28T12:01:00.0923534Z", // sender timestamp in UTC
"Endpoint": "opc.tcp://myopcserver.contoso/UA/",
Expand All @@ -65,7 +65,7 @@ Reads the histroy for a UA Node on an OPC UA server that must be in the UA Cloud

```json
{
"Command": "read",
"Command": "HistorialRead",
"CorrelationId": "D892A987-56FB-4724-AF14-5EC6A7EBDD07", // a GUID
"TimeStamp": "2022-11-28T12:01:00.0923534Z", // sender timestamp in UTC
"Endpoint": "opc.tcp://myopcserver.contoso/UA/",
Expand All @@ -81,7 +81,7 @@ Writes a UA Node on an OPC UA server that must be in the UA Cloud Commander's ne

```json
{
"Command": "write",
"Command": "Write",
"CorrelationId": "D892A987-56FB-4724-AF14-5EC6A7EBDD07", // a GUID
"TimeStamp": "2022-11-28T12:01:00.0923534Z", // sender timestamp in UTC
"Endpoint": "opc.tcp://myopcserver.contoso/UA/",
Expand All @@ -101,7 +101,7 @@ Executes a command on an OPC UA server that must be in the UA Cloud Commander's

```json
{
"Command": "methodcall",
"Command": "MethodCall",
"CorrelationId": "D892A987-56FB-4724-AF14-5EC6A7EBDD07", // a GUID
"TimeStamp": "2022-11-28T12:01:00.0923534Z", // sender timestamp in UTC
"Endpoint": "opc.tcp://myopcserver.contoso/UA/",
Expand Down
8 changes: 4 additions & 4 deletions UACloudCommander.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Azure.Storage.Blobs" Version="12.19.1" />
<PackageReference Include="Azure.Storage.Blobs" Version="12.20.0" />
<PackageReference Include="Confluent.Kafka" Version="2.4.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.20.1" />
<PackageReference Include="MQTTnet" Version="4.3.3.952" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua" Version="1.5.374.36" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client.ComplexTypes" Version="1.5.374.36" />
<PackageReference Include="MQTTnet" Version="4.3.5.1141" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua" Version="1.5.374.54" />
<PackageReference Include="OPCFoundation.NetStandard.Opc.Ua.Client.ComplexTypes" Version="1.5.374.54" />
<PackageReference Include="Serilog" Version="3.1.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />
<PackageReference Include="Serilog.Sinks.File" Version="5.0.0" />
Expand Down

0 comments on commit 03b4f64

Please sign in to comment.