Skip to content

Commit

Permalink
Merge pull request #14 from sve2-2020ss/fix/frontend-service2
Browse files Browse the repository at this point in the history
fix fronend service amqp
  • Loading branch information
agentS authored Apr 27, 2020
2 parents d14f8e9 + 35f0bbf commit b32dbba
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ public MqttCollector(@ConfigProperty(name = "mqtt.topic-prefix") String topicPre
@Outgoing("amqp-measurement-records")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
@Broadcast
public Message<JsonObject> processMeasurement(MqttMessage<byte[]> message) {
public Message<String> processMeasurement(MqttMessage<byte[]> message) {
long weatherStationId = MqttMessageParser.parseWeatherStationId(message.getTopic(), this.topicPrefix);
String payload = new String(message.getPayload());
MeasurementDto measurement = MqttMessageParser.parseMeasurementCsv(payload);
LocalDateTime timestamp = LocalDateTime.now()
.truncatedTo(ChronoUnit.SECONDS);
RecordDto record = new RecordDto(weatherStationId, timestamp, measurement);
return Message.of(
RecordJsonConverter.toJson(record),
RecordJsonConverter.toJson(record).toString(),
message::ack
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.weatherStation.message;

import io.vertx.amqp.AmqpMessage;
import io.vertx.core.json.JsonObject;
import io.weatherStation.SocketManager;
import io.weatherStation.dto.RecordDto;
Expand All @@ -22,10 +23,10 @@ public AmqpConsumer(SocketManager socketManager){

@Incoming("measurement-records")
@Acknowledgment(Acknowledgment.Strategy.NONE)
public void process(JsonObject message) {
public void process(String message) {
System.out.println("received: " + message);
Jsonb jsonBuilder = JsonbBuilder.create();
RecordDto record = jsonBuilder.fromJson(message.toString(), RecordDto.class);
socketManager.broadcast(String.valueOf(record.getWeatherStationId()), message.toString());
RecordDto record = jsonBuilder.fromJson(message, RecordDto.class);
socketManager.broadcast(String.valueOf(record.getWeatherStationId()), message);
}
}
5 changes: 3 additions & 2 deletions frontend/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ quarkus.http.cors=true
amqp-username=weatherdata
amqp-password=thunderstorm

# Configure the AMQP connector to read from the `prices` queue
# Configure the AMQP connector to read from the `measurement-records` queue
mp.messaging.incoming.measurement-records.connector=smallrye-amqp
mp.messaging.incoming.measurement-records.durable=true
mp.messaging.incoming.measurement-records.containerId=frontend
mp.messaging.incoming.measurement-records.durable=false

0 comments on commit b32dbba

Please sign in to comment.