From 35f0bbfcd097eba7ad984fb78b3d74b4a5530661 Mon Sep 17 00:00:00 2001 From: Alexander Doppelbauer Date: Mon, 27 Apr 2020 20:07:33 +0200 Subject: [PATCH] fix fronend service amqp --- .../main/java/io/weatherstation/mqtt/MqttCollector.java | 4 ++-- .../main/java/io/weatherStation/message/AmqpConsumer.java | 7 ++++--- frontend/src/main/resources/application.properties | 5 +++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/collector/src/main/java/io/weatherstation/mqtt/MqttCollector.java b/collector/src/main/java/io/weatherstation/mqtt/MqttCollector.java index 049aa8b..5d76b17 100644 --- a/collector/src/main/java/io/weatherstation/mqtt/MqttCollector.java +++ b/collector/src/main/java/io/weatherstation/mqtt/MqttCollector.java @@ -27,7 +27,7 @@ public MqttCollector(@ConfigProperty(name = "mqtt.topic-prefix") String topicPre @Outgoing("amqp-measurement-records") @Acknowledgment(Acknowledgment.Strategy.MANUAL) @Broadcast - public Message processMeasurement(MqttMessage message) { + public Message processMeasurement(MqttMessage message) { long weatherStationId = MqttMessageParser.parseWeatherStationId(message.getTopic(), this.topicPrefix); String payload = new String(message.getPayload()); MeasurementDto measurement = MqttMessageParser.parseMeasurementCsv(payload); @@ -35,7 +35,7 @@ public Message processMeasurement(MqttMessage message) { .truncatedTo(ChronoUnit.SECONDS); RecordDto record = new RecordDto(weatherStationId, timestamp, measurement); return Message.of( - RecordJsonConverter.toJson(record), + RecordJsonConverter.toJson(record).toString(), message::ack ); } diff --git a/frontend/src/main/java/io/weatherStation/message/AmqpConsumer.java b/frontend/src/main/java/io/weatherStation/message/AmqpConsumer.java index fd11478..d8dbf03 100644 --- a/frontend/src/main/java/io/weatherStation/message/AmqpConsumer.java +++ b/frontend/src/main/java/io/weatherStation/message/AmqpConsumer.java @@ -1,5 +1,6 @@ package io.weatherStation.message; +import io.vertx.amqp.AmqpMessage; import io.vertx.core.json.JsonObject; import io.weatherStation.SocketManager; import io.weatherStation.dto.RecordDto; @@ -22,10 +23,10 @@ public AmqpConsumer(SocketManager socketManager){ @Incoming("measurement-records") @Acknowledgment(Acknowledgment.Strategy.NONE) - public void process(JsonObject message) { + public void process(String message) { System.out.println("received: " + message); Jsonb jsonBuilder = JsonbBuilder.create(); - RecordDto record = jsonBuilder.fromJson(message.toString(), RecordDto.class); - socketManager.broadcast(String.valueOf(record.getWeatherStationId()), message.toString()); + RecordDto record = jsonBuilder.fromJson(message, RecordDto.class); + socketManager.broadcast(String.valueOf(record.getWeatherStationId()), message); } } diff --git a/frontend/src/main/resources/application.properties b/frontend/src/main/resources/application.properties index 97bb3b9..5e54148 100644 --- a/frontend/src/main/resources/application.properties +++ b/frontend/src/main/resources/application.properties @@ -11,6 +11,7 @@ quarkus.http.cors=true amqp-username=weatherdata amqp-password=thunderstorm -# Configure the AMQP connector to read from the `prices` queue +# Configure the AMQP connector to read from the `measurement-records` queue mp.messaging.incoming.measurement-records.connector=smallrye-amqp -mp.messaging.incoming.measurement-records.durable=true \ No newline at end of file +mp.messaging.incoming.measurement-records.containerId=frontend +mp.messaging.incoming.measurement-records.durable=false \ No newline at end of file