diff --git a/KafkaClient.cs b/KafkaClient.cs index 2036e85..fb08d87 100644 --- a/KafkaClient.cs +++ b/KafkaClient.cs @@ -1,84 +1,81 @@ - -namespace Opc.Ua.Cloud.Commander -{ - using Confluent.Kafka; + +namespace Opc.Ua.Cloud.Commander +{ + using Confluent.Kafka; using Newtonsoft.Json; - using Org.BouncyCastle.Asn1.Ocsp; using Serilog; using System; - using System.Collections.Generic; - using System.Text; - using System.Threading; + using System.Text; using System.Threading.Tasks; - public class KafkaClient - { - private ApplicationConfiguration _appConfig = null; - - private IProducer _producer = null; - private IConsumer _consumer = null; - - public KafkaClient(ApplicationConfiguration appConfig) - { - _appConfig = appConfig; - } - - public void Connect() - { - try - { - // create Kafka client - var config = new ProducerConfig { - BootstrapServers = Environment.GetEnvironmentVariable("BROKERNAME") + ":9093", - MessageTimeoutMs = 10000, - SecurityProtocol = SecurityProtocol.SaslSsl, - SaslMechanism = SaslMechanism.Plain, - SaslUsername = Environment.GetEnvironmentVariable("USERNAME"), - SaslPassword = Environment.GetEnvironmentVariable("PASSWORD") - }; - - _producer = new ProducerBuilder(config).Build(); - - var conf = new ConsumerConfig - { - GroupId = Environment.GetEnvironmentVariable("CLIENTNAME"), - BootstrapServers = Environment.GetEnvironmentVariable("BROKERNAME") + ":9093", - AutoOffsetReset = AutoOffsetReset.Earliest, - SecurityProtocol= SecurityProtocol.SaslSsl, - SaslMechanism = SaslMechanism.Plain, - SaslUsername = Environment.GetEnvironmentVariable("USERNAME"), - SaslPassword= Environment.GetEnvironmentVariable("PASSWORD") - }; - - _consumer = new ConsumerBuilder(conf).Build(); - - _consumer.Subscribe(Environment.GetEnvironmentVariable("TOPIC")); - - _ = Task.Run(() => HandleCommand()); - - Log.Logger.Information("Connected to Kafka broker."); - - } - catch (Exception ex) - { - Log.Logger.Error("Failed to connect to Kafka broker: " + ex.Message); - } - } - - public void Publish(string payload) - { + public class KafkaClient + { + private ApplicationConfiguration _appConfig = null; + + private IProducer _producer = null; + private IConsumer _consumer = null; + + public KafkaClient(ApplicationConfiguration appConfig) + { + _appConfig = appConfig; + } + + public void Connect() + { + try + { + // create Kafka client + var config = new ProducerConfig { + BootstrapServers = Environment.GetEnvironmentVariable("BROKERNAME") + ":9093", + MessageTimeoutMs = 10000, + SecurityProtocol = SecurityProtocol.SaslSsl, + SaslMechanism = SaslMechanism.Plain, + SaslUsername = Environment.GetEnvironmentVariable("USERNAME"), + SaslPassword = Environment.GetEnvironmentVariable("PASSWORD") + }; + + _producer = new ProducerBuilder(config).Build(); + + var conf = new ConsumerConfig + { + GroupId = Environment.GetEnvironmentVariable("CLIENTNAME"), + BootstrapServers = Environment.GetEnvironmentVariable("BROKERNAME") + ":9093", + AutoOffsetReset = AutoOffsetReset.Earliest, + SecurityProtocol= SecurityProtocol.SaslSsl, + SaslMechanism = SaslMechanism.Plain, + SaslUsername = Environment.GetEnvironmentVariable("USERNAME"), + SaslPassword= Environment.GetEnvironmentVariable("PASSWORD") + }; + + _consumer = new ConsumerBuilder(conf).Build(); + + _consumer.Subscribe(Environment.GetEnvironmentVariable("TOPIC")); + + _ = Task.Run(() => HandleCommand()); + + Log.Logger.Information("Connected to Kafka broker."); + + } + catch (Exception ex) + { + Log.Logger.Error("Failed to connect to Kafka broker: " + ex.Message); + } + } + + public void Publish(string payload) + { Message message = new() - { - Headers = new Headers() { { "Content-Type", Encoding.UTF8.GetBytes("application/json") } }, - Value = payload - }; - - _producer.ProduceAsync(Environment.GetEnvironmentVariable("RESPONSE_TOPIC"), message).GetAwaiter().GetResult(); - } - - // handles all incoming commands form the cloud - private void HandleCommand() - { + { + Headers = new Headers() { { "Content-Type", Encoding.UTF8.GetBytes("application/json") } }, + Value = payload + }; + + _producer.ProduceAsync(Environment.GetEnvironmentVariable("RESPONSE_TOPIC"), message).GetAwaiter().GetResult(); + } + + // handles all incoming commands form the cloud + private void HandleCommand() + { while (true) { ResponseModel response = new() @@ -106,25 +103,25 @@ private void HandleCommand() response.CorrelationId = request.CorrelationId; // route this to the right handler - if (request.Command == "methodcall") + if (request.Command == "MethodCall") { new UAClient().ExecuteUACommand(_appConfig, requestPayload); Log.Logger.Information($"Call succeeded, sending response to broker..."); response.Success = true; } - else if (request.Command == "read") + else if (request.Command == "Read") { response.Status = new UAClient().ReadUAVariable(_appConfig, requestPayload); Log.Logger.Information($"Read succeeded, sending response to broker..."); response.Success = true; } - else if (request.Command == "historyread") + else if (request.Command == "HistoricalRead") { response.Status = new UAClient().ReadUAHistory(_appConfig, requestPayload); Log.Logger.Information($"History read succeeded, sending response to broker..."); response.Success = true; } - else if (request.Command == "write") + else if (request.Command == "Write") { new UAClient().WriteUAVariable(_appConfig, requestPayload); Log.Logger.Information($"Write succeeded, sending response to broker..."); @@ -150,7 +147,7 @@ private void HandleCommand() // send error to Kafka broker Publish(JsonConvert.SerializeObject(response)); } - } - } - } + } + } + } } \ No newline at end of file diff --git a/MQTTClient.cs b/MQTTClient.cs index 264feaa..2373dd7 100644 --- a/MQTTClient.cs +++ b/MQTTClient.cs @@ -260,22 +260,22 @@ private async Task HandleMessageAsync(MqttApplicationMessageReceivedEventArgs ar response.CorrelationId = request.CorrelationId; // route this to the right handler - if (args.ApplicationMessage.Topic.StartsWith(requestTopic.TrimEnd('#') + "MethodCall")) + if (request.Command == "MethodCall") { new UAClient().ExecuteUACommand(_uAApplication, requestPayload); response.Success = true; } - else if (args.ApplicationMessage.Topic.StartsWith(requestTopic.TrimEnd('#') + "Read")) + else if (request.Command == "Read") { response.Status = new UAClient().ReadUAVariable(_uAApplication, requestPayload); response.Success = true; } - else if (args.ApplicationMessage.Topic.StartsWith(requestTopic.TrimEnd('#') + "HistoricalRead")) + else if (request.Command == "HistoricalRead") { response.Status = new UAClient().ReadUAHistory(_uAApplication, requestPayload); response.Success = true; } - else if (args.ApplicationMessage.Topic.StartsWith(requestTopic.TrimEnd('#') + "Write")) + else if (request.Command == "Write") { new UAClient().WriteUAVariable(_uAApplication, requestPayload); response.Success = true; diff --git a/README.md b/README.md index 1658030..b1b60e9 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ Reads a UA Node on an OPC UA server that must be in the UA Cloud Commander's net ```json { - "Command": "read", + "Command": "Read", "CorrelationId": "D892A987-56FB-4724-AF14-5EC6A7EBDD07", // a GUID "TimeStamp": "2022-11-28T12:01:00.0923534Z", // sender timestamp in UTC "Endpoint": "opc.tcp://myopcserver.contoso/UA/", @@ -65,7 +65,7 @@ Reads the histroy for a UA Node on an OPC UA server that must be in the UA Cloud ```json { - "Command": "read", + "Command": "HistorialRead", "CorrelationId": "D892A987-56FB-4724-AF14-5EC6A7EBDD07", // a GUID "TimeStamp": "2022-11-28T12:01:00.0923534Z", // sender timestamp in UTC "Endpoint": "opc.tcp://myopcserver.contoso/UA/", @@ -81,7 +81,7 @@ Writes a UA Node on an OPC UA server that must be in the UA Cloud Commander's ne ```json { - "Command": "write", + "Command": "Write", "CorrelationId": "D892A987-56FB-4724-AF14-5EC6A7EBDD07", // a GUID "TimeStamp": "2022-11-28T12:01:00.0923534Z", // sender timestamp in UTC "Endpoint": "opc.tcp://myopcserver.contoso/UA/", @@ -101,7 +101,7 @@ Executes a command on an OPC UA server that must be in the UA Cloud Commander's ```json { - "Command": "methodcall", + "Command": "MethodCall", "CorrelationId": "D892A987-56FB-4724-AF14-5EC6A7EBDD07", // a GUID "TimeStamp": "2022-11-28T12:01:00.0923534Z", // sender timestamp in UTC "Endpoint": "opc.tcp://myopcserver.contoso/UA/", diff --git a/UACloudCommander.csproj b/UACloudCommander.csproj index 47f13db..6949fa4 100644 --- a/UACloudCommander.csproj +++ b/UACloudCommander.csproj @@ -28,12 +28,12 @@ - + - - - + + +