diff --git a/river-monitoring-service/Server/build.gradle.kts b/river-monitoring-service/Server/build.gradle.kts index b943348..3dfa746 100644 --- a/river-monitoring-service/Server/build.gradle.kts +++ b/river-monitoring-service/Server/build.gradle.kts @@ -17,7 +17,10 @@ repositories { } dependencies { - implementation("io.vertx:vertx-mqtt:4.5.1") //for mqtt vertx server + implementation("io.vertx:vertx-mqtt:4.2.6") + implementation("io.vertx:vertx-core:4.2.6") + implementation("io.vertx:vertx-web:4.2.6") + implementation("io.vertx:vertx-web-client:4.2.6") implementation("com.google.code.gson:gson:2.8.8") //for json implementation("io.github.java-native:jssc:2.9.6") //for serial communication implementation("org.slf4j:slf4j-api:1.7.32") diff --git a/river-monitoring-service/Server/src/main/java/org/mqttserver/Main.java b/river-monitoring-service/Server/src/main/java/org/mqttserver/Main.java index c05c3c7..98c0e36 100644 --- a/river-monitoring-service/Server/src/main/java/org/mqttserver/Main.java +++ b/river-monitoring-service/Server/src/main/java/org/mqttserver/Main.java @@ -2,10 +2,13 @@ import com.google.gson.stream.MalformedJsonException; +import io.vertx.core.Vertx; import org.mqttserver.policy.ChannelControllerManager; import org.mqttserver.policy.ChannelControllerManagerImpl; import org.mqttserver.presentation.JSONUtils; import org.mqttserver.presentation.MessageFromArduino; +import org.mqttserver.services.HTTP.DataService; +import org.mqttserver.services.HTTP.HTTPServerImpl; import org.mqttserver.services.MQTT.Broker; import org.mqttserver.services.MQTT.BrokerImpl; @@ -16,13 +19,27 @@ public static void main(String[] args) throws Exception { //start the broker (MQTTServer) Broker broker = new BrokerImpl(); broker.initialize(broker.getMqttServer()); + + //start the httpServer and DataService (for dashboard and http server) + Vertx vertx = Vertx.vertx(); + HTTPServerImpl httpServer = new HTTPServerImpl(); + vertx.deployVerticle(httpServer); + DataService service = new DataService(8050, broker); + vertx.deployVerticle(service); + //Init Channel Controller Manager - ChannelControllerManager channelControllerManager = new ChannelControllerManagerImpl(broker, null); + ChannelControllerManager channelControllerManager = new ChannelControllerManagerImpl(broker); + while (true) { - channelControllerManager.sendMessageToArduino(broker.getSystemController().getStatus()); //I send the message to arduino with state - String msg = channelControllerManager.receiveDataFromArduino(); //i receive the answer from arduino - broker.getSystemController().checkValveValue(msg, broker); //check valve value - Thread.sleep(200); + + if (!broker.getSystemController().getIsManual()) { + channelControllerManager.sendMessageToArduino(broker.getSystemController().getStatus()); //I send the message to arduino with state + //String msg = channelControllerManager.receiveDataFromArduino(); //I receive the answer from arduino + //broker.getSystemController().checkValveValue(msg, broker); //check valve value + } else { + channelControllerManager.sendMessageToArduino(broker.getSystemController().getValveValue()); //I send the message to arduino with state + } + Thread.sleep(400); } } } \ No newline at end of file diff --git a/river-monitoring-service/Server/src/main/java/org/mqttserver/policy/ChannelControllerManager.java b/river-monitoring-service/Server/src/main/java/org/mqttserver/policy/ChannelControllerManager.java index 10749f2..4280601 100644 --- a/river-monitoring-service/Server/src/main/java/org/mqttserver/policy/ChannelControllerManager.java +++ b/river-monitoring-service/Server/src/main/java/org/mqttserver/policy/ChannelControllerManager.java @@ -8,18 +8,11 @@ public interface ChannelControllerManager { - //ARDUINO void sendMessageToArduino(Status status); - String receiveDataFromArduino() throws InterruptedException; - - //DASHBOARD - void sendMessageToDashboard(MessageToDashboard message); - - - - + void sendMessageToArduino(int valveValue); + String receiveDataFromArduino() throws InterruptedException; } diff --git a/river-monitoring-service/Server/src/main/java/org/mqttserver/policy/ChannelControllerManagerImpl.java b/river-monitoring-service/Server/src/main/java/org/mqttserver/policy/ChannelControllerManagerImpl.java index 00726d4..144e952 100644 --- a/river-monitoring-service/Server/src/main/java/org/mqttserver/policy/ChannelControllerManagerImpl.java +++ b/river-monitoring-service/Server/src/main/java/org/mqttserver/policy/ChannelControllerManagerImpl.java @@ -5,32 +5,25 @@ import org.mqttserver.serial.SerialCommChannelImpl; import org.mqttserver.serial.SerialScanner; import org.mqttserver.serial.SerialScannerImpl; -import org.mqttserver.services.HTTP.HTTPServer; import org.mqttserver.services.MQTT.Broker; public class ChannelControllerManagerImpl implements ChannelControllerManager { private Broker broker; - - private HTTPServer httpServer; - private SerialCommChannel serialCommChannel; private SerialScanner serialScanner = new SerialScannerImpl(); - public ChannelControllerManagerImpl(Broker broker, HTTPServer httpServer) throws Exception { + public ChannelControllerManagerImpl(Broker broker) throws Exception { //Init broker and http server this.broker = broker; - this.httpServer = httpServer; //init serial communication this.serialCommChannel = new SerialCommChannelImpl(this.serialScanner.getConnectedPort(), 9600 ); System.out.println("Started CHANNEL CONTROLLER " + "\nChannel Controller Controls MQTTServer: " + this.broker.getMqttServer().toString() + " ON PORT: " + this.broker.getMqttServer().actualPort()); } - - @Override public void sendMessageToArduino(Status status) { MessageToArduino messageToArduino = new MessageToArduino(status); @@ -38,14 +31,13 @@ public void sendMessageToArduino(Status status) { } @Override - public String receiveDataFromArduino() throws InterruptedException { //USE SERIAL PROTOCOL - return this.serialCommChannel.receiveMessageFromArduino(); + public void sendMessageToArduino(int valveValue) { + MessageToArduino messageToArduino = new MessageToArduino(valveValue); + this.serialCommChannel.sendMessageToArduino(JSONUtils.objectToJson(messageToArduino)); } @Override - public void sendMessageToDashboard(MessageToDashboard message) { //USE HTTP PROTOCOL - + public String receiveDataFromArduino() throws InterruptedException { //USE SERIAL PROTOCOL + return this.serialCommChannel.receiveMessageFromArduino(); } - - } diff --git a/river-monitoring-service/Server/src/main/java/org/mqttserver/policy/SystemController.java b/river-monitoring-service/Server/src/main/java/org/mqttserver/policy/SystemController.java index 4af2d0f..4fbfeb4 100644 --- a/river-monitoring-service/Server/src/main/java/org/mqttserver/policy/SystemController.java +++ b/river-monitoring-service/Server/src/main/java/org/mqttserver/policy/SystemController.java @@ -11,10 +11,19 @@ public interface SystemController { void setWL(float wl); Status getStatus(); + float getWl(); + + int getValveValue(); + + void setValveValueFromDashboard(int valveValue); Map getStatusValveValue(); int getFrequency(); void checkValveValue(String msg, Broker broker); + + void setIsManual(boolean isManual); + + boolean getIsManual(); } diff --git a/river-monitoring-service/Server/src/main/java/org/mqttserver/policy/SystemControllerImpl.java b/river-monitoring-service/Server/src/main/java/org/mqttserver/policy/SystemControllerImpl.java index f4c9de7..fdaf452 100644 --- a/river-monitoring-service/Server/src/main/java/org/mqttserver/policy/SystemControllerImpl.java +++ b/river-monitoring-service/Server/src/main/java/org/mqttserver/policy/SystemControllerImpl.java @@ -3,6 +3,7 @@ import io.vertx.core.buffer.Buffer; import org.mqttserver.presentation.JSONUtils; import org.mqttserver.presentation.MessageFromArduino; +import org.mqttserver.presentation.MessageToArduino; import org.mqttserver.presentation.Status; import org.mqttserver.services.MQTT.Broker; @@ -13,6 +14,11 @@ public class SystemControllerImpl implements SystemController { private Status status = null; + private int valveValue = 0; + + private float wl = 0; + + private boolean isManual = false; private final double WL1 = 5; private final double WL2 = 20; private final double WL3 = 25; @@ -22,9 +28,9 @@ public class SystemControllerImpl implements SystemController { private int frequency = 1; - private final int F1 = 1800; //1800ms + private final int F1 = 6000; //1800ms - private final int F2 = 1000; //1000ms + private final int F2 = 2000; //1000ms private final int F0 = 0; @@ -54,6 +60,7 @@ public SystemControllerImpl() { public void setWL(float wl) { System.out.println("WL RECEIVED VALUE: " + wl); if (wl > INVALID_WL) { //INVALID WL = -1; + this.wl = wl; if (wl < WL1) { this.status = Status.ALARM_TOO_LOW; } else if (wl > WL1 && wl <= WL2) { @@ -73,7 +80,7 @@ public void setWL(float wl) { this.status = Status.INVALID_STATUS; } - System.out.println("SET SYSTEM STATUS: " + this.status); + System.out.println("SET SYSTEM STATUS: " + this.status.toString().toUpperCase()); } public Status getStatus() { @@ -81,9 +88,10 @@ public Status getStatus() { System.err.println("SERVER: STATUS undefined, check your connection to sensor"); return null; } - return this.status; //return this.status + return this.status; } + public Map getStatusValveValue() { return this.statusValveValue; } @@ -93,6 +101,24 @@ public int getFrequency() { return this.frequency; } + @Override + public float getWl() { + return this.wl; + } + + @Override + public int getValveValue() { + return this.valveValue; + } + + @Override + public void setValveValueFromDashboard(int valveValue) { + this.valveValue = valveValue; + //MessageToArduino messageToArduino = new MessageToArduino(statusValveValue); + + } + + @Override public void checkValveValue(String msg, Broker broker) { try { @@ -100,6 +126,8 @@ public void checkValveValue(String msg, Broker broker) { Integer valveValue = JSONUtils.jsonToObject(msg, MessageFromArduino.class).getValveValue(); if (valveValue.equals(broker.getSystemController().getStatusValveValue().get(broker.getSystemController().getStatus()))) { System.out.println("SERVER: Valve value ok"); + this.valveValue = valveValue; + } else { System.err.println("SERVER: Valve value incorrect for system state"); } @@ -108,4 +136,16 @@ public void checkValveValue(String msg, Broker broker) { System.err.println("Il server รจ In attesa di dati validi da parte di Arduino...."); } } + + @Override + public void setIsManual(boolean isManual) { + this.isManual = isManual; + } + + @Override + public boolean getIsManual() { + return this.isManual; + } + + } diff --git a/river-monitoring-service/Server/src/main/java/org/mqttserver/presentation/MessageFromDashboard.java b/river-monitoring-service/Server/src/main/java/org/mqttserver/presentation/MessageFromDashboard.java new file mode 100644 index 0000000..9ae9700 --- /dev/null +++ b/river-monitoring-service/Server/src/main/java/org/mqttserver/presentation/MessageFromDashboard.java @@ -0,0 +1,16 @@ +package org.mqttserver.presentation; + + +/*The message sent by Dashboard using POST method in HTTP Server*/ + +public class MessageFromDashboard { + private final int valveValue; + + public MessageFromDashboard(int valveValue) { + this.valveValue = valveValue; + } + public int getValveValue() { + return valveValue; + } + +} diff --git a/river-monitoring-service/Server/src/main/java/org/mqttserver/presentation/MessageToArduino.java b/river-monitoring-service/Server/src/main/java/org/mqttserver/presentation/MessageToArduino.java index 26f311e..49bf1c7 100644 --- a/river-monitoring-service/Server/src/main/java/org/mqttserver/presentation/MessageToArduino.java +++ b/river-monitoring-service/Server/src/main/java/org/mqttserver/presentation/MessageToArduino.java @@ -8,10 +8,15 @@ public class MessageToArduino { private Status status; + private int valveValue; + public MessageToArduino(Status status) { this.status = status; } + public MessageToArduino(int valveValue) { + this.valveValue = valveValue; + } public Status getStatus() { return this.status; } diff --git a/river-monitoring-service/Server/src/main/java/org/mqttserver/presentation/MessageToDashboard.java b/river-monitoring-service/Server/src/main/java/org/mqttserver/presentation/MessageToDashboard.java index 13d633f..84500f8 100644 --- a/river-monitoring-service/Server/src/main/java/org/mqttserver/presentation/MessageToDashboard.java +++ b/river-monitoring-service/Server/src/main/java/org/mqttserver/presentation/MessageToDashboard.java @@ -1,9 +1,29 @@ package org.mqttserver.presentation; -/** - * This class contains the representation of the JSON message to send to dashboard by - * HTTPServer. TODO: TO DEFINE THE MESSAGE - * */ public class MessageToDashboard { + + private float WL; + private Status status; + + private int valveValue; + + public MessageToDashboard(float WL, Status status, int valveValue) { + this.WL = WL; + this.status = status; + this.valveValue = valveValue; + } + + public float getWL() { + return WL; + } + + public Status getStatus() { + return status; + } + + public int getValveValue() { + return valveValue; + } + } diff --git a/river-monitoring-service/Server/src/main/java/org/mqttserver/presentation/SerialMessage.java b/river-monitoring-service/Server/src/main/java/org/mqttserver/presentation/SerialMessage.java deleted file mode 100644 index 168767d..0000000 --- a/river-monitoring-service/Server/src/main/java/org/mqttserver/presentation/SerialMessage.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.mqttserver.presentation; - - - -/* -* This class is just for test purpose -* */ -public class SerialMessage { - private String key; - private int number; - private boolean is_true; - - public String getKey() { - return key; - } - - public void setKey(String key) { - this.key = key; - } - - public int getNumber() { - return number; - } - - public void setNumber(int number) { - this.number = number; - } - - public boolean isIs_true() { - return is_true; - } - - public void setIs_true(boolean is_true) { - this.is_true = is_true; - } -} \ No newline at end of file diff --git a/river-monitoring-service/Server/src/main/java/org/mqttserver/services/HTTP/DataService.java b/river-monitoring-service/Server/src/main/java/org/mqttserver/services/HTTP/DataService.java new file mode 100644 index 0000000..174d969 --- /dev/null +++ b/river-monitoring-service/Server/src/main/java/org/mqttserver/services/HTTP/DataService.java @@ -0,0 +1,68 @@ +package org.mqttserver.services.HTTP; + +import io.vertx.core.AbstractVerticle; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.RoutingContext; +import io.vertx.ext.web.handler.BodyHandler; +import org.mqttserver.services.MQTT.Broker; + +import java.util.LinkedList; + +public class DataService extends AbstractVerticle { + + private final int port; + private final Broker broker; + + public DataService(int port, Broker broker) { + this.port = port; + this.broker = broker; + } + + @Override + public void start() { + Router router = Router.router(vertx); + router.route().handler(BodyHandler.create()); + router.post("/api/postdata").handler(this::handleAddNewData); + router.get("/api/systemdata").handler(this::handleGetData); + + vertx.createHttpServer().requestHandler(router).listen(port); + System.out.println("Service ready on port: " + port); + } + + private void handleAddNewData(RoutingContext routingContext) { + HttpServerResponse response = routingContext.response(); + JsonObject res = routingContext.getBodyAsJson(); + if (res == null) { + sendError(400, response); + } else { + int valveValue = res.getInteger("valveValue"); + boolean isManual = res.getBoolean("isManual"); + System.err.println("Valve value received from dashboard: " + valveValue + " and isManual: " + isManual); + response.setStatusCode(200).end(); + broker.getSystemController().setValveValueFromDashboard(valveValue); + broker.getSystemController().setIsManual(isManual); + } + } + + private void handleGetData(RoutingContext routingContext) { + JsonArray arr = new JsonArray(); + JsonObject data = new JsonObject(); + data.put("status", broker.getSystemController().getStatus()); + data.put("valveValue", broker.getSystemController().getValveValue()); + data.put("wl", broker.getSystemController().getWl()); + arr.add(data); + + routingContext.response() + .putHeader("content-type", "application/json") + .end(arr.encodePrettily()); + } + + private void sendError(int statusCode, HttpServerResponse response) { + response.setStatusCode(statusCode).end(); + } + + +} \ No newline at end of file diff --git a/river-monitoring-service/Server/src/main/java/org/mqttserver/services/HTTP/HTTPServer.java b/river-monitoring-service/Server/src/main/java/org/mqttserver/services/HTTP/HTTPServer.java deleted file mode 100644 index 6aa7dcc..0000000 --- a/river-monitoring-service/Server/src/main/java/org/mqttserver/services/HTTP/HTTPServer.java +++ /dev/null @@ -1,4 +0,0 @@ -package org.mqttserver.services.HTTP; -public interface HTTPServer { - -} diff --git a/river-monitoring-service/Server/src/main/java/org/mqttserver/services/HTTP/HTTPServerImpl.java b/river-monitoring-service/Server/src/main/java/org/mqttserver/services/HTTP/HTTPServerImpl.java new file mode 100644 index 0000000..73f29c4 --- /dev/null +++ b/river-monitoring-service/Server/src/main/java/org/mqttserver/services/HTTP/HTTPServerImpl.java @@ -0,0 +1,25 @@ +package org.mqttserver.services.HTTP; +import io.vertx.core.AbstractVerticle; +import io.vertx.core.MultiMap; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.Router; +import org.mqttserver.services.MQTT.Broker; + +public class HTTPServerImpl extends AbstractVerticle { + + @Override + public void start() throws Exception { + // Create a Router + Router router = Router.router(vertx); + // Create the HTTP server + vertx.createHttpServer() + // Handle every request using the router + .requestHandler(router) + // Start listening + .listen(8050) + // Print the port + .onSuccess(server -> System.out.println("HTTP server started on port " + server.actualPort())); + } + +} diff --git a/river-monitoring-service/Server/src/main/java/org/mqttserver/services/MQTT/BrokerImpl.java b/river-monitoring-service/Server/src/main/java/org/mqttserver/services/MQTT/BrokerImpl.java index 50ff3c1..16602ee 100644 --- a/river-monitoring-service/Server/src/main/java/org/mqttserver/services/MQTT/BrokerImpl.java +++ b/river-monitoring-service/Server/src/main/java/org/mqttserver/services/MQTT/BrokerImpl.java @@ -24,12 +24,9 @@ public class BrokerImpl implements Broker { - private final String HOST = "broker"; protected MqttServer mqttServer = null; - protected MqttServerOptions mqttServerOptions = null; - private final Vertx vertx; private final List subscribedClients = new ArrayList<>(); @@ -61,30 +58,40 @@ public void initialize(MqttServer mqttServer) { }); endpoint.publishHandler(message -> { - System.out.println("Received message on topic " + message.topicName() + " with payload: " + message.payload().toString()); - if (!subscribedClients.isEmpty()) { - for (MqttEndpoint client : subscribedClients) { - client.publish(message.topicName(), message.payload(), MqttQoS.valueOf(0), false, false); - } - } - if (Objects.equals(message.topicName(), "/sensor/wl")) { + if (!this.getSystemController().getIsManual()) { - //setta lo stato del sistema in base al valore rilevato dell acqua - MessageFromSensor messageObj = new MessageFromSensor(JSONUtils.jsonToObject(message.payload().toString(), MessageFromSensor.class).getWL()); - System.out.println("Value received from ESP32 (sensor): " + messageObj.getWL()); - this.updateSystem(messageObj.getWL()); + System.out.println("Received message on topic " + message.topicName() + " with payload: " + message.payload().toString()); - //pubblicare la frequenza su tutti i client in base allo stato - if (this.systemController.getFrequency() != 1) { - MessageToSensor messageToSensor = new MessageToSensor(String.valueOf(this.systemController.getFrequency())); - System.out.println("MESSAGE TO SENSOR: " + JSONUtils.objectToJson(messageToSensor)); + if (!subscribedClients.isEmpty()) { for (MqttEndpoint client : subscribedClients) { - client.publish("sensor/freq", Buffer.buffer(JSONUtils.objectToJson(messageToSensor)), MqttQoS.valueOf(0), false, false); + client.publish(message.topicName(), message.payload(), MqttQoS.valueOf(0), false, false); } } + if (Objects.equals(message.topicName(), "/sensor/wl")) { + + //setta lo stato del sistema in base al valore rilevato dell acqua + + MessageFromSensor messageObj = new MessageFromSensor(JSONUtils.jsonToObject(message.payload().toString(), MessageFromSensor.class).getWL()); + System.out.println("Value received from ESP32 (sensor): " + messageObj.getWL()); + this.updateSystem(messageObj.getWL()); + + + //pubblicare la frequenza su tutti i client in base allo stato + if (this.systemController.getFrequency() != 1) { + MessageToSensor messageToSensor = new MessageToSensor(String.valueOf(this.systemController.getFrequency())); + System.out.println("MESSAGE TO SENSOR: " + JSONUtils.objectToJson(messageToSensor)); + + for (MqttEndpoint client : subscribedClients) { + client.publish("sensor/freq", Buffer.buffer(JSONUtils.objectToJson(messageToSensor)), MqttQoS.valueOf(0), false, false); + } + } + + } + } else { + System.out.println("The Sensor has been sent the message but OPERATOR set the system in manual mode, waiting for automatic mode..."); } });